Files
ansible-roles/roles/lemonldap_ng/files/Web.pm

229 lines
6.1 KiB
Perl
Raw Normal View History

2025-08-22 17:00:22 +02:00
package Lemonldap::NG::Common::MessageBroker::Web;
use strict;
use IO::Socket::INET;
use IO::Socket::SSL;
use IO::Select;
use Lemonldap::NG::Common::FormEncode;
use Lemonldap::NG::Common::UserAgent;
use JSON;
use Protocol::WebSocket::Client;
our $VERSION = '2.22.0';
use constant DEFAULTWS => 'localhost:8080';
our $pr = '::MessageBroker::Web:';
sub new {
my ( $class, $conf, $logger ) = @_;
my $args = $conf->{messageBrokerOptions} // {};
my $ssl = '';
unless ( $args->{server} ) {
$args->{server} = DEFAULTWS;
$logger->info("$pr no server given");
}
2025-08-28 17:00:23 +02:00
elsif ( $args->{server} =~ m#^(?:(?:http|ws)(s)?://)?(.+?)/*$# ) {
2025-08-22 17:00:22 +02:00
$args->{server} = $2;
2025-08-28 17:00:23 +02:00
$ssl = 's' if $1;
2025-08-22 17:00:22 +02:00
}
else {
$logger->error("$pr unparsable server '$args->{server}'");
$args->{server} = DEFAULTWS;
}
$logger->debug("$pr using server $args->{server}");
my $self = bless {
logger => $logger,
server => $args->{server},
messages => {},
ssl => $ssl,
token => $args->{token},
ua => $args->{ua} || Lemonldap::NG::Common::UserAgent->new($conf),
}, $class;
$self->{ua}->env_proxy();
return $self;
}
sub publish {
my ( $self, $channel, $msg ) = @_;
die 'Not a hash msg' unless ref $msg eq 'HASH';
$msg->{channel} = $channel;
my $j = eval { JSON::to_json($msg) };
if ($@) {
$self->logger->error("$pr message error: $@");
return;
}
my $req = HTTP::Request->new(
POST => "http$self->{ssl}://$self->{server}/publish",
[
'Content-Length' => length($j),
(
$self->{token}
? ( Authorization => "Bearer $self->{token}" )
: ()
)
],
$j
);
my $resp = $self->ua->request($req);
$resp->is_success
? ( $self->logger->debug("$pr publish $msg->{action}") )
: ( $self->logger->error( "$pr publish error: " . $resp->status_line ) );
}
sub subscribe {
my ( $self, $channel ) = @_;
return
if $self->{channels}
and $self->{channels} =~ /^(?:.*,)?$channel(?:,.*)?$/;
$self->{channels} =
$self->{channels} ? "$self->{channels},$channel" : $channel;
$self->{messages}{$channel} = [];
$self->logger->debug("$pr subscribe to $self->{channels}");
my $sock = $self->_connect;
}
sub getNextMessage {
my ( $self, $channel ) = @_;
return undef
unless $self->{ws} and defined $self->{messages}{$channel};
return shift( @{ $self->{messages}{$channel} } )
if @{ $self->{messages}{$channel} };
$self->_read_socket;
return shift( @{ $self->{messages}{$channel} } )
if @{ $self->{messages}{$channel} };
}
sub waitForNextMessage {
my ( $self, $channel ) = @_;
return undef
unless $self->{messages}{$channel};
my $res;
do {
$res = $self->getNextMessage($channel);
sleep 1 unless $res;
} while ( !$res );
return $res;
}
sub _connect {
my ($self) = @_;
my ( $host, $port ) = split /:/, $self->{server};
2025-08-28 17:00:23 +02:00
# Remove any path from host or port
$host =~ s|/.*$||;
$port =~ s|/.*$||;
# If port is not defined, use 80 or 443
unless ( $port =~ m/^\d+$/ ){
$port = ( $self->{ssl} eq 's' ) ? 443 : 80;
}
2025-08-22 17:00:22 +02:00
my $sock = IO::Socket::INET->new(
PeerHost => $host,
PeerPort => $port,
Proto => 'tcp',
Timeout => 5,
)
or do {
$self->logger->error("$pr Failed to connect to $self->{server}: $!");
$self->{connected} = 0;
return;
};
$self->logger->debug("$pr connected");
if ( $self->{ssl} ) {
$sock = IO::Socket::SSL->start_SSL( $sock, SSL_verify_mode => 0 )
or do {
$self->logger->error("$pr SSL error: $!");
$self->{connected} = 0;
return;
};
$self->logger->debug("$pr connection upgraded to TLS");
}
2025-08-28 17:00:23 +02:00
my $url = "ws$self->{ssl}://$self->{server}/subscribe?"
2025-08-22 17:00:22 +02:00
. build_urlencoded( channels => $self->{channels} );
$self->logger->debug("$pr connects to $url");
my $client = Protocol::WebSocket::Client->new( url => $url );
$client->on(
read => sub {
my ( $c, $buf ) = @_;
if ( $buf =~ /^{.*}$/ ) {
eval {
my $data = JSON::decode_json($buf);
if ( $data->{channel}
&& defined $self->{messages}->{ $data->{channel} } )
{
push @{ $self->{messages}->{ $data->{channel} } },
$data;
}
else {
$self->logger->info(
"$pr received a message for an unknown channel");
}
};
$self->logger->error("$pr unable to read websocket: $@")
if ($@);
}
else {
$self->logger->warn("$pr received an unreadable message: $buf");
}
}
);
$client->on(
write => sub {
my ( $c, $buf ) = @_;
print $sock $buf;
}
);
$client->on(
error => sub {
$self->logger->error("$pr websocket error: $_[1]");
}
);
$client->{hs}->{req}->{headers} =
[ Authorization => "Bearer $self->{token}", ]
if $self->{token};
$client->connect();
2025-08-28 17:00:23 +02:00
my $buf;
$sock->sysread( $buf, 4096 );
$client->read($buf);
2025-08-22 17:00:22 +02:00
$self->{socket} = $sock;
$self->{selector} = IO::Select->new($sock);
$self->{ws} = $client;
$self->{connected} = 1;
}
sub _read_socket {
my ($self) = @_;
return unless $self->{connected};
return unless $self->{selector}->can_read(0.01);
my $sock = $self->{socket};
my $buf;
my $n = sysread( $sock, $buf, 4096 );
if ( !defined $n || $n == 0 ) {
warn "Connection lost, trying to reconnect...\n";
$self->{connected} = 0;
$self->_connect;
return;
}
$self->{ws}->read($buf);
}
# Accessors
sub logger {
return $_[0]->{logger};
}
sub ua {
return $_[0]->{ua};
}
1;