nonlib/OSC/Endpoint: Work around for liblo/UDP layer dropping packets on bulk signal listing.

This commit is contained in:
Jonathan Moore Liles 2020-10-20 18:52:37 -07:00
parent 830823a226
commit 362a153412
2 changed files with 138 additions and 20 deletions

View File

@ -23,6 +23,7 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <unistd.h>
#include "Endpoint.H" #include "Endpoint.H"
@ -30,6 +31,8 @@
#pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic ignored "-Wunused-parameter"
static int SCAN_BATCH_SIZE = 100;
namespace OSC namespace OSC
{ {
@ -744,20 +747,80 @@ namespace OSC
{ {
// OSC_DMSG(); // OSC_DMSG();
DMESSAGE( "Listing signals." );
const char *prefix = NULL; const char *prefix = NULL;
int skip = 0;
bool batch_mode = true;
int start;
if ( argc ) const int count = SCAN_BATCH_SIZE;
if ( 's' == types[0] )
{
prefix = &argv[0]->s; prefix = &argv[0]->s;
DMESSAGE( "Listing signals for prefix \"%s\"", prefix );
skip++;
batch_mode = 2 == argc;
}
else
{
DMESSAGE( "Listing all signals." );
}
if ( 0 == argc || 's' == types[0] )
{
/* obsolete unbatched mode left for backwards compatibility... */
WARNING("Peer asked for unbatched signal list... will incur delays!");
batch_mode = false;
}
else
{
/* we have to list these in batches, otherwise they the UDP packets overrun and get dropped... Probably
wouldn't be an issue with TCP transport. */
batch_mode = true;
start = argv[0+skip]->i;
/* count = argv[1+skip]->i; */
}
Endpoint *ep = (Endpoint*)user_data; Endpoint *ep = (Endpoint*)user_data;
int sent = 0;
int s = 0;
bool more = false;
lo_bundle b;
if ( batch_mode )
{
b = lo_bundle_new(LO_TT_IMMEDIATE);
}
for ( std::list<Signal*>::const_iterator i = ep->_signals.begin(); i != ep->_signals.end(); ++i ) for ( std::list<Signal*>::const_iterator i = ep->_signals.begin(); i != ep->_signals.end(); ++i )
{ {
Signal *o = *i; Signal *o = *i;
if ( ! prefix || ! strncmp( o->path(), prefix, strlen(prefix) ) ) if ( ! prefix || ! strncmp( o->path(), prefix, strlen(prefix) ) )
{
if ( s++ < start )
continue;
/* DMESSAGE( "Listing signal %s", o->path() ); */
if ( batch_mode )
{
lo_message m = lo_message_new();
lo_message_add_string( m, path );
lo_message_add_string( m, o->path() );
lo_message_add_string( m, o->_direction == Signal::Input ? "in" : "out" );
lo_message_add_float( m, o->parameter_limits().min );
lo_message_add_float( m, o->parameter_limits().max );
lo_message_add_float( m, o->parameter_limits().default_value );
lo_bundle_add_message( b, "/reply", m );
}
else
{ {
ep->send( lo_message_get_source( msg ), ep->send( lo_message_get_source( msg ),
"/reply", "/reply",
@ -769,8 +832,36 @@ namespace OSC
o->parameter_limits().default_value o->parameter_limits().default_value
); );
} }
if ( batch_mode )
{
if ( ++sent == count )
{
more = true;
break;
}
}
else
{
usleep(1000);
}
}
} }
if ( batch_mode )
{
lo_message m = lo_message_new();
lo_message_add_string(m, path);
lo_message_add_int32( m, sent );
lo_message_add_int32( m, more );
lo_bundle_add_message( b, "/reply", m );
lo_send_bundle_from( lo_message_get_source( msg ), ep->_server, b );
/* ep->send( lo_message_get_source( msg ), "/reply", path, sent, more ); */
}
else
ep->send( lo_message_get_source( msg ), "/reply", path ); ep->send( lo_message_get_source( msg ), "/reply", path );
return 0; return 0;
@ -908,12 +999,37 @@ namespace OSC
if ( argc == 1 ) if ( argc == 1 )
{ {
/* old handler for backwards compatibility */
p->_scanning = false; p->_scanning = false;
DMESSAGE( "Done scanning %s", p->name ); DMESSAGE( "Done scanning %s", p->name );
if ( ep->_peer_scan_complete_callback ) if ( ep->_peer_scan_complete_callback )
ep->_peer_scan_complete_callback(ep->_peer_scan_complete_userdata); ep->_peer_scan_complete_callback(ep->_peer_scan_complete_userdata);
} }
else
if ( argc == 3 )
{
const int sent = argv[1]->i;
const int more = argv[2]->i;
if ( !more )
{
p->_scanning = false;
DMESSAGE( "Done scanning %s", p->name );
if ( ep->_peer_scan_complete_callback )
ep->_peer_scan_complete_callback(ep->_peer_scan_complete_userdata);
}
else
{
DMESSAGE( "Scanning next batch %s", p->name );
p->_scanning_current += sent;
ep->send( p->addr, "/signal/list", p->_scanning_current );
}
}
else if ( argc == 6 && p->_scanning ) else if ( argc == 6 && p->_scanning )
{ {
Signal *s = ep->find_peer_signal_by_path( p, &argv[1]->s ); Signal *s = ep->find_peer_signal_by_path( p, &argv[1]->s );
@ -921,7 +1037,7 @@ namespace OSC
if ( s ) if ( s )
return 0; return 0;
DMESSAGE( "Peer %s has signal %s (%s)", p->name, &argv[1]->s, &argv[2]->s ); /* DMESSAGE( "Peer %s has signal %s (%s)", p->name, &argv[1]->s, &argv[2]->s ); */
int dir = 0; int dir = 0;
@ -1143,10 +1259,11 @@ namespace OSC
Peer *p = add_peer(name,url); Peer *p = add_peer(name,url);
p->_scanning = true; p->_scanning = true;
p->_scanning_current = 0;
DMESSAGE( "Scanning peer %s", name ); DMESSAGE( "Scanning peer %s", name );
send( p->addr, "/signal/list" ); send( p->addr, "/signal/list", p->_scanning_current );
} }
void * void *

View File

@ -120,6 +120,7 @@ namespace OSC
struct Peer struct Peer
{ {
bool _scanning; bool _scanning;
int _scanning_current;
char *name; char *name;
lo_address addr; lo_address addr;