Wrap pthreads in a Thread class. Implement basic thread role checking.

This commit is contained in:
Jonathan Moore Liles 2008-06-01 20:13:18 -05:00
parent 08e50292c8
commit 9ae6c0ea5e
16 changed files with 283 additions and 51 deletions

View File

@ -26,6 +26,10 @@
#include "Audio_File.H"
#include "dsp.h"
#include "util/Thread.H"
/** Apply a (portion of) fade from /start/ to /end/ assuming a
* buffer size of /nframes/. /start/ and /end/ are relative to the
* given buffer, and /start/ may be negative. */
@ -63,7 +67,6 @@ Audio_Region::Fade::apply ( sample_t *buf, Audio_Region::Fade::fade_dir_e dir, l
*(buf++) *= gain( fi );
}
/* THREAD: IO */
/** read the overlapping part of /channel/ at /pos/ for /nframes/ of
this region into /buf/, where /pos/ is in timeline frames */
/* this runs in the diskstream thread. */
@ -78,6 +81,8 @@ Audio_Region::Fade::apply ( sample_t *buf, Audio_Region::Fade::fade_dir_e dir, l
nframes_t
Audio_Region::read ( sample_t *buf, nframes_t pos, nframes_t nframes, int channel ) const
{
THREAD_ASSERT( Playback );
const Range r = _range;
/* do nothing if we aren't covered by this frame range */
@ -193,12 +198,13 @@ Audio_Region::prepare ( void )
log_start();
}
/* THREAD: IO */
/** write /nframes/ from /buf/ to source. /buf/ is interleaved and
must match the channel layout of the write source! */
nframes_t
Audio_Region::write ( nframes_t nframes )
{
THREAD_ASSERT( Capture );
_range.length += nframes;
/* FIXME: too much? */
@ -230,12 +236,13 @@ Audio_Region::write ( nframes_t nframes )
return nframes;
}
/* THREAD: IO */
/** finalize region capture. Assumes that this *is* a captured region
and that no other regions refer to the same source */
bool
Audio_Region::finalize ( nframes_t frame )
{
THREAD_ASSERT( Capture );
DMESSAGE( "finalizing capture region" );
_range.length = frame - _range.start;

View File

@ -21,6 +21,8 @@
#include "dsp.h"
#include "util/Thread.H"
using namespace std;
@ -29,12 +31,13 @@ using namespace std;
/* Engine */
/**********/
/* THREAD: IO */
/** determine region coverage and fill /buf/ with interleaved samples
* from /frame/ to /nframes/ for exactly /channels/ channels. */
nframes_t
Audio_Sequence::play ( sample_t *buf, nframes_t frame, nframes_t nframes, int channels )
{
THREAD_ASSERT( Playback );
sample_t *cbuf = new sample_t[ nframes ];
memset( cbuf, 0, nframes * sizeof( sample_t ) );
@ -65,10 +68,3 @@ Audio_Sequence::play ( sample_t *buf, nframes_t frame, nframes_t nframes, int ch
/* FIXME: bogus */
return nframes;
}
/* /\* THREAD: RT *\/ */
/* nframes_t */
/* Audio_Sequence::process ( nframes_t nframes ) */
/* { */
/* return disktream->process( nframes ); */
/* } */

View File

@ -21,6 +21,8 @@
#include "../Transport.H" // for ->frame
#include "util/Thread.H"
#include <list>
using std::list;
@ -45,12 +47,13 @@ sigmoid_interpolate ( float y1, float y2, float mu )
/* THREAD: RT */
/** fill buf with /nframes/ of interpolated control curve values
* starting at /frame/ */
nframes_t
Control_Sequence::play ( sample_t *buf, nframes_t frame, nframes_t nframes )
{
THREAD_ASSERT( RT );
Control_Point *p2, *p1 = (Control_Point*)&_widgets.front();
nframes_t n = nframes;
@ -84,11 +87,11 @@ Control_Sequence::play ( sample_t *buf, nframes_t frame, nframes_t nframes )
return nframes - n;
}
/* THREAD: RT */
nframes_t
Control_Sequence::process ( nframes_t nframes )
{
THREAD_ASSERT( RT );
if ( _output->connected() ) /* don't waste CPU on disconnected ports */
{
void *buf = _output->buffer( nframes );

View File

@ -58,7 +58,6 @@ Disk_Stream::Disk_Stream ( Track *track, float frame_rate, nframes_t nframes, in
assert( channels );
_frame = 0;
_thread = 0;
_terminate = false;
_pending_seek = -1;
_xruns = 0;
@ -87,11 +86,11 @@ Disk_Stream::~Disk_Stream ( )
}
/* THREAD: RT */
/** flush buffers and reset. Must only be called from the RT thread. */
void
Disk_Stream::base_flush ( bool is_output )
{
THREAD_ASSERT( RT );
/* flush buffers */
for ( int i = _rb.size(); i--; )
@ -132,7 +131,7 @@ Disk_Stream::detach ( void )
block_processed();
pthread_detach( _thread );
_thread.detach();
}
/** stop the IO thread. */
@ -144,8 +143,8 @@ Disk_Stream::shutdown ( void )
/* try to wake the thread so it'll see that it's time to die */
block_processed();
if ( _thread )
pthread_join( _thread, NULL );
if ( _thread.running() )
_thread.join();
}
Track *
@ -164,9 +163,9 @@ Disk_Stream::sequence ( void ) const
void
Disk_Stream::run ( void )
{
ASSERT( ! _thread, "Thread is already running" );
ASSERT( ! _thread.running(), "Thread is already running" );
if ( pthread_create( &_thread, NULL, &Disk_Stream::disk_thread, this ) != 0 )
if ( ! _thread.clone( &Disk_Stream::disk_thread, this ) )
FATAL( "Could not create IO thread!" );
}
@ -202,7 +201,7 @@ Disk_Stream::resize_buffers ( nframes_t nframes )
{
DMESSAGE( "resizing buffers" );
const bool was_running = _thread;
const bool was_running = _thread.running();
if ( was_running )
shutdown();

View File

@ -24,11 +24,11 @@
#include <semaphore.h>
#include <errno.h>
#include <pthread.h>
#include <vector>
#include "types.h"
#include "util/Mutex.H"
#include "util/Thread.H"
class Track;
class Audio_Sequence;
@ -40,9 +40,10 @@ class Disk_Stream : public Mutex
Disk_Stream ( const Disk_Stream &rhs );
Disk_Stream & operator = ( const Disk_Stream &rhs );
protected:
pthread_t _thread; /* io thread */
Thread _thread; /* io thread */
Track *_track; /* Track we belong to */

View File

@ -28,7 +28,9 @@
/* This is the home of the JACK process callback (does this *really*
need to be a class?) */
Engine::Engine ( )
#include "util/Thread.H"
Engine::Engine ( ) : _thread( "RT" )
{
_freewheeling = false;
_client = NULL;
@ -190,6 +192,9 @@ Engine::timebase ( jack_transport_state_t, jack_nframes_t, jack_position_t *pos,
int
Engine::process ( nframes_t nframes )
{
/* FIXME: wrong place for this */
_thread.set( "RT" );
transport->poll();
if ( freewheeling() )
@ -237,6 +242,18 @@ Engine::freewheeling ( bool yes )
WARNING( "Unkown error while setting freewheeling mode" );
}
void
Engine::thread_init ( void *arg )
{
((Engine*)arg)->thread_init();
}
void
Engine::thread_init ( void )
{
_thread.set( "RT" );
}
int
Engine::init ( void )
{
@ -245,6 +262,7 @@ Engine::init ( void )
#define set_callback( name ) jack_set_ ## name ## _callback( _client, &Engine:: name , this )
set_callback( thread_init );
set_callback( process );
set_callback( xrun );
set_callback( freewheel );

View File

@ -27,10 +27,15 @@ typedef jack_nframes_t nframes_t;
class Port;
#include "Thread.H"
class Engine : public Mutex
{
jack_client_t *_client;
Thread _thread; /* only used for thread checking */
/* I know locking out the process callback is cheating, even
though we use trylock... The thing is, every other DAW does
this too and you can hear it in the glitches Ardour and friends
@ -57,6 +62,8 @@ class Engine : public Mutex
void freewheel ( bool yes );
static int buffer_size ( nframes_t nframes, void *arg );
int buffer_size ( nframes_t nframes );
static void thread_init ( void *arg );
void thread_init ( void );
Engine ( const Engine &rhs );
Engine & operator = ( const Engine &rhs );

View File

@ -38,6 +38,8 @@
#include "assert.h"
#include "util/debug.h"
#include "util/Thread.H"
#include <errno.h>
#include <list>
@ -520,11 +522,12 @@ Peak::normalization_factor( void ) const
return s;
}
/* THREAD: IO */
/* wrapper for peak writer */
void
Peaks::prepare_for_writing ( void )
{
THREAD_ASSERT( Capture );
assert( ! _peak_writer );
_peak_writer = new Peaks::Streamer( _clip->name(), _clip->channels(), cache_minimum );
@ -544,10 +547,11 @@ Peaks::finish_writing ( void )
}
/* THREAD: IO */
void
Peaks::write ( sample_t *buf, nframes_t nframes )
{
THREAD_ASSERT( Capture );
_peak_writer->write( buf, nframes );
}

View File

@ -31,6 +31,7 @@
#include "dsp.h"
#include "util/debug.h"
#include "util/Thread.H"
bool
Playback_DS::seek_pending ( void )
@ -38,7 +39,6 @@ Playback_DS::seek_pending ( void )
return _pending_seek != (nframes_t)-1;
}
/* THREAD: RT */
/** request that the IO thread perform a seek and rebuffer. This is
called for each Disk_Stream whenever the RT thread determines that
the transport has jumped to a new position. This is called *before*
@ -46,6 +46,8 @@ Playback_DS::seek_pending ( void )
void
Playback_DS::seek ( nframes_t frame )
{
THREAD_ASSERT( RT );
DMESSAGE( "requesting seek" );
if ( seek_pending() )
@ -56,11 +58,11 @@ Playback_DS::seek ( nframes_t frame )
flush();
}
/* THREAD: IO */
/** read /nframes/ from the attached track into /buf/ */
void
Playback_DS::read_block ( sample_t *buf, nframes_t nframes )
{
THREAD_ASSERT( Playback );
memset( buf, 0, nframes * sizeof( sample_t ) * channels() );
@ -88,10 +90,11 @@ Playback_DS::read_block ( sample_t *buf, nframes_t nframes )
#define AVOID_UNNECESSARY_COPYING 1
/* THREAD: IO */
void
Playback_DS::disk_thread ( void )
{
_thread.name( "Playback" );
DMESSAGE( "playback thread running" );
/* buffer to hold the interleaved data returned by the track reader */
@ -198,15 +201,15 @@ Playback_DS::disk_thread ( void )
#endif
_terminate = false;
_thread = 0;
}
/* THREAD: RT */
/** take a single block from the ringbuffers and send it out the
* attached track's ports */
nframes_t
Playback_DS::process ( nframes_t nframes )
{
THREAD_ASSERT( RT );
const size_t block_size = nframes * sizeof( sample_t );
// printf( "process: %lu %lu %lu\n", _frame, _frame + nframes, nframes );

View File

@ -30,6 +30,7 @@
#include "dsp.h"
#include "util/debug.h"
#include "util/Thread.H"
const Audio_Region *
Record_DS::capture_region ( void ) const
@ -40,11 +41,11 @@ Record_DS::capture_region ( void ) const
return NULL;
}
/* THREAD: IO */
/** write /nframes/ from buf to the capture file of the attached track */
void
Record_DS::write_block ( sample_t *buf, nframes_t nframes )
{
THREAD_ASSERT( Capture );
/* stupid chicken/egg */
if ( ! ( timeline && sequence() ) )
@ -61,12 +62,16 @@ Record_DS::write_block ( sample_t *buf, nframes_t nframes )
#define AVOID_UNNECESSARY_COPYING 1
/* THREAD: IO */
void
Record_DS::disk_thread ( void )
{
_thread.name( "Capture" );
track()->record( _capture, _frame );
DMESSAGE( "capture thread running..." );
const nframes_t nframes = _nframes * _disk_io_blocks;
/* buffer to hold the interleaved data returned by the track reader */
@ -199,7 +204,6 @@ Record_DS::disk_thread ( void )
delete _capture;
_capture = NULL;
_thread = 0;
_terminate = false;
DMESSAGE( "capture thread gone" );
}
@ -209,6 +213,7 @@ Record_DS::disk_thread ( void )
void
Record_DS::start ( nframes_t frame )
{
THREAD_ASSERT( UI );
if ( _recording )
{
@ -216,15 +221,13 @@ Record_DS::start ( nframes_t frame )
return;
}
/* FIXME: safe to do this here? */
flush();
/* /\* FIXME: safe to do this here? *\/ */
/* flush(); */
_frame = frame;
_capture = new Track::Capture;
track()->record( _capture, frame );
run();
_recording = true;
@ -235,6 +238,8 @@ Record_DS::start ( nframes_t frame )
void
Record_DS::stop ( nframes_t frame )
{
THREAD_ASSERT( UI );
if ( ! _recording )
{
WARNING( "programming error: attempt to stop recording when no recording is being made" );
@ -251,11 +256,11 @@ Record_DS::stop ( nframes_t frame )
}
/* THREAD: RT */
/** read from the attached track's ports and stuff the ringbuffers */
nframes_t
Record_DS::process ( nframes_t nframes )
{
THREAD_ASSERT( RT );
if ( ! _recording )
return 0;

View File

@ -25,6 +25,8 @@
#include "Record_DS.H"
#include "Playback_DS.H"
#include "util/Thread.H"
/** Initiate recording for all armed tracks */
bool
Timeline::record ( void )
@ -90,10 +92,11 @@ Timeline::process ( nframes_t nframes )
return nframes;
}
/* THREAD: RT */
void
Timeline::seek ( nframes_t frame )
{
THREAD_ASSERT( RT );
for ( int i = tracks->children(); i-- ; )
{
Track *t = (Track*)tracks->child( i );
@ -114,10 +117,11 @@ Timeline::resize_buffers ( nframes_t nframes )
}
}
/* THREAD: RT */
int
Timeline::seek_pending ( void )
{
THREAD_ASSERT( RT );
int r = 0;
for ( int i = tracks->children(); i-- ; )

View File

@ -148,10 +148,11 @@ Track::configure_inputs ( int n )
return true;
}
/* THREAD: RT */
nframes_t
Track::process ( nframes_t nframes )
{
THREAD_ASSERT( RT );
if ( ! transport->rolling )
{
for ( int i = output.size(); i--; )
@ -175,10 +176,11 @@ Track::process ( nframes_t nframes )
return 0;
}
/* THREAD: RT */
void
Track::seek ( nframes_t frame )
{
THREAD_ASSERT( RT );
if ( playback_ds )
return playback_ds->seek( frame );
}
@ -207,13 +209,12 @@ uuid ( void )
return (unsigned long long) t;
}
/* THREAD: IO */
/** create capture region and prepare to record */
void
Track::record ( Capture *c, nframes_t frame )
{
THREAD_ASSERT( Capture );
char pat[256];
snprintf( pat, sizeof( pat ), "%s-%llu", name(), uuid() );
@ -231,11 +232,12 @@ Track::record ( Capture *c, nframes_t frame )
c->region->prepare();
}
/* THREAD: IO */
/** write a block to the (already opened) capture file */
void
Track::write ( Capture *c, sample_t *buf, nframes_t nframes )
{
THREAD_ASSERT( Capture );
nframes_t l = c->audio_file->write( buf, nframes );
c->region->write( l );
@ -243,10 +245,11 @@ Track::write ( Capture *c, sample_t *buf, nframes_t nframes )
#include <stdio.h>
/* THREAD: IO */
void
Track::finalize ( Capture *c, nframes_t frame )
{
THREAD_ASSERT( Capture );
c->region->finalize( frame );
DMESSAGE( "finalizing audio file" );
c->audio_file->finalize();

View File

@ -47,6 +47,8 @@
#include "Transport.H"
#include "Engine/Engine.H"
#include "util/Thread.H"
Engine *engine;
Timeline *timeline;
Transport *transport;
@ -85,6 +87,10 @@ ensure_dirs ( void )
int
main ( int argc, char **argv )
{
Thread::init();
Thread thread( "UI" );
thread.set();
fl_register_images();

View File

@ -4,7 +4,7 @@ Timeline_VERSION := 0.5.0
Timeline_SRCS := $(wildcard Timeline/*.C Timeline/*.fl Timeline/Engine/*.C)
Timeline_SRCS += util/debug.C
Timeline_SRCS += util/debug.C util/Thread.C
Timeline_SRCS:=$(Timeline_SRCS:.fl=.C)
Timeline_SRCS:=$(sort $(Timeline_SRCS))

118
util/Thread.C Normal file
View File

@ -0,0 +1,118 @@
/*******************************************************************************/
/* Copyright (C) 2008 Jonathan Moore Liles */
/* */
/* This program is free software; you can redistribute it and/or modify it */
/* under the terms of the GNU General Public License as published by the */
/* Free Software Foundation; either version 2 of the License, or (at your */
/* option) any later version. */
/* */
/* This program is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for */
/* more details. */
/* */
/* You should have received a copy of the GNU General Public License along */
/* with This program; see the file COPYING. If not,write to the Free Software */
/* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
/*******************************************************************************/
#include "Thread.H"
#include <assert.h>
#include <string.h>
pthread_key_t Thread::_current = 0;
Thread::Thread ( )
{
_thread = 0;
_name = 0;
}
Thread::Thread ( const char *name )
{
_thread = 0;
_name = name;
}
void
Thread::init ( void )
{
pthread_key_create( &_current, NULL );
}
bool
Thread::is ( const char *name )
{
return ! strcmp( Thread::current()->name(), name );
}
/** to be used by existing threads (that won't call clone()) */
void
Thread::set ( const char *name )
{
_thread = pthread_self();
_name = name;
pthread_setspecific( _current, (void*)this );
}
Thread *
Thread::current ( void )
{
return (Thread*)pthread_getspecific( _current );
}
struct thread_data
{
void *(*entry_point)(void *);
void *arg;
void *t;
};
void *
Thread::run_thread ( void *arg )
{
thread_data td = *(thread_data *)arg;
delete (thread_data*)arg;
pthread_setspecific( _current, td.t );
return td.entry_point( td.arg );
}
bool
Thread::clone ( void *(*entry_point)(void *), void *arg )
{
assert( ! _thread );
thread_data *td = new thread_data;
td->entry_point = entry_point;
td->arg = arg;
td->t = this;
if ( pthread_create( &_thread, NULL, run_thread, td ) != 0 )
return false;
return true;
}
void
Thread::detach ( void )
{
pthread_detach( _thread );
_thread = 0;
}
void
Thread::join ( void )
{
pthread_join( _thread, NULL );
_thread = 0;
}

58
util/Thread.H Normal file
View File

@ -0,0 +1,58 @@
/*******************************************************************************/
/* Copyright (C) 2008 Jonathan Moore Liles */
/* */
/* This program is free software; you can redistribute it and/or modify it */
/* under the terms of the GNU General Public License as published by the */
/* Free Software Foundation; either version 2 of the License, or (at your */
/* option) any later version. */
/* */
/* This program is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for */
/* more details. */
/* */
/* You should have received a copy of the GNU General Public License along */
/* with This program; see the file COPYING. If not,write to the Free Software */
/* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */
/*******************************************************************************/
#pragma once
/* simple wrapper for pthreads with thread role checking */
#include <pthread.h>
#include "debug.h"
#define THREAD_ASSERT( n ) ASSERT( Thread::is( #n ), "Function called from wrong thread! (is %s, should be %s)", Thread::current()->name(), #n )
class Thread
{
static pthread_key_t _current;
pthread_t _thread;
const char * _name;
static void * run_thread ( void *arg );
public:
static bool is ( const char *name );
static void init ( void );
static Thread *current ( void );
Thread ( );
Thread ( const char *name );
const char *name ( void ) const { return _name; }
void name ( const char *name ) { _name = name; }
bool running ( void ) const { return _thread; }
void set ( const char *name );
void set ( void ) { set( _name ); }
bool clone ( void *(*entry_point)(void *), void *arg );
void detach ( void );
void join ( void );
};