Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show number of connected peers in UI #156

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/app/lib/app.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:shared_preferences/shared_preferences.dart';

import 'package:app/io/graphql/graphql.dart' as graphql;
import 'package:app/router.dart';
import 'package:app/ui/widgets/connected_peers_provider.dart';
import 'package:app/ui/widgets/image_provider.dart';
import 'package:app/ui/widgets/refresh_provider.dart';

Expand Down Expand Up @@ -56,7 +57,8 @@ class MeliAppState extends State<MeliApp> {
return GraphQLProvider(
client: client,
child: RefreshProvider(
child: MeliCameraProvider(MaterialApp.router(
child: MeliCameraProvider(ConnectedPeersProvider(
child: MaterialApp.router(
// Register router for navigation
routerDelegate: router.routerDelegate,
routeInformationProvider: router.routeInformationProvider,
Expand Down Expand Up @@ -90,6 +92,6 @@ class MeliAppState extends State<MeliApp> {

// Disable "debug" banner shown in top right corner during development
debugShowCheckedModeBanner: false,
))));
)))));
}
}
2 changes: 2 additions & 0 deletions packages/app/lib/ui/screens/all_species.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:app/models/base.dart';
import 'package:app/models/species.dart';
import 'package:app/router.dart';
import 'package:app/ui/colors.dart';
import 'package:app/ui/widgets/connected_peers.dart';
import 'package:app/ui/widgets/pagination_list.dart';
import 'package:app/ui/widgets/refresh_provider.dart';
import 'package:app/ui/widgets/scaffold.dart';
Expand All @@ -21,6 +22,7 @@ class AllSpeciesScreen extends StatelessWidget {
Widget build(BuildContext context) {
return MeliScaffold(
title: AppLocalizations.of(context)!.allSpeciesScreenTitle,
actionLeft: const ConnectedPeers(),
body: RefreshIndicator(
color: MeliColors.black,
onRefresh: () {
Expand Down
19 changes: 19 additions & 0 deletions packages/app/lib/ui/widgets/connected_peers.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

import 'package:app/ui/widgets/connected_peers_provider.dart';
import 'package:flutter/material.dart';

class ConnectedPeers extends StatefulWidget {
const ConnectedPeers({super.key});

@override
State<ConnectedPeers> createState() => _ConnectedPeersState();
}

class _ConnectedPeersState extends State<ConnectedPeers> {
@override
Widget build(BuildContext context) {
int numPeers = ConnectedPeersProvider.of(context).getNumPeers();
return Text("$numPeers");
}
}
51 changes: 51 additions & 0 deletions packages/app/lib/ui/widgets/connected_peers_provider.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

import 'dart:async';

import 'package:app/io/p2panda/p2panda.dart';
import 'package:p2panda/src/bridge_generated.dart';

import 'package:flutter/widgets.dart';

class ConnectedPeersProvider extends InheritedWidget {
StreamSubscription<NodeEvent>? subscription;
int _numPeers = 0;

ConnectedPeersProvider({
super.key,
required super.child,
});

void _init() {
if (subscription != null) {
return;
}

p2panda.subscribeEventStream().listen((event) {
print("$event");
switch (event) {
case NodeEvent.PeerConnected:
_numPeers += 1;
break;
case NodeEvent.PeerDisconnected:
_numPeers -= 1;
break;
}
});
}

int getNumPeers() {
_init();
return _numPeers;
}

static ConnectedPeersProvider of(BuildContext context) {
return context
.dependOnInheritedWidgetOfExactType<ConnectedPeersProvider>()!;
}

@override
bool updateShouldNotify(ConnectedPeersProvider oldWidget) {
return _numPeers != oldWidget._numPeers;
}
}
6 changes: 6 additions & 0 deletions packages/app/lib/ui/widgets/scaffold.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:app/ui/colors.dart';
class MeliScaffold extends StatefulWidget {
final String? title;
final Widget? body;
final Widget? actionLeft;
final Widget? actionRight;
final Color backgroundColor;
final Color appBarColor;
Expand All @@ -17,6 +18,7 @@ class MeliScaffold extends StatefulWidget {
{super.key,
this.body,
this.title,
this.actionLeft,
this.actionRight,
this.floatingActionButtons = const [],
this.fabAlignment = MainAxisAlignment.spaceBetween,
Expand All @@ -39,6 +41,10 @@ class _MeliScaffoldState extends State<MeliScaffold> {
backgroundColor: widget.appBarColor,
title: Stack(
children: [
if (widget.actionLeft != null)
Row(
mainAxisAlignment: MainAxisAlignment.start,
children: [widget.actionLeft!]),
Row(mainAxisAlignment: MainAxisAlignment.center, children: [
IconButton(
icon: const Icon(Icons.arrow_back_rounded),
Expand Down
45 changes: 45 additions & 0 deletions packages/p2panda/lib/src/bridge_generated.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ abstract class P2Panda {

FlutterRustBridgeTaskConstMeta get kStartNodeConstMeta;

/// Listen to events coming from the aquadoggo node.
Stream<NodeEvent> subscribeEventStream({dynamic hint});

FlutterRustBridgeTaskConstMeta get kSubscribeEventStreamConstMeta;

/// Turns off running node.
Future<void> shutdownNode({dynamic hint});

Expand Down Expand Up @@ -150,6 +155,11 @@ class KeyPair {
);
}

enum NodeEvent {
PeerConnected,
PeerDisconnected,
}

/// Operations are categorised by their action type.
///
/// An action defines the operation format and if this operation creates, updates or deletes a data
Expand Down Expand Up @@ -376,6 +386,23 @@ class P2PandaImpl implements P2Panda {
],
);

Stream<NodeEvent> subscribeEventStream({dynamic hint}) {
return _platform.executeStream(FlutterRustBridgeTask(
callFfi: (port_) => _platform.inner.wire_subscribe_event_stream(port_),
parseSuccessData: _wire2api_node_event,
parseErrorData: null,
constMeta: kSubscribeEventStreamConstMeta,
argValues: [],
hint: hint,
));
}

FlutterRustBridgeTaskConstMeta get kSubscribeEventStreamConstMeta =>
const FlutterRustBridgeTaskConstMeta(
debugName: "subscribe_event_stream",
argNames: [],
);

Future<void> shutdownNode({dynamic hint}) {
return _platform.executeNormal(FlutterRustBridgeTask(
callFfi: (port_) => _platform.inner.wire_shutdown_node(port_),
Expand Down Expand Up @@ -538,6 +565,10 @@ class P2PandaImpl implements P2Panda {
);
}

NodeEvent _wire2api_node_event(dynamic raw) {
return NodeEvent.values[raw as int];
}

OperationAction _wire2api_operation_action(dynamic raw) {
return OperationAction.values[raw as int];
}
Expand Down Expand Up @@ -998,6 +1029,20 @@ class P2PandaWire implements FlutterRustBridgeWireBase {
ffi.Pointer<wire_StringList>,
ffi.Pointer<wire_StringList>)>();

void wire_subscribe_event_stream(
int port_,
) {
return _wire_subscribe_event_stream(
port_,
);
}

late final _wire_subscribe_event_streamPtr =
_lookup<ffi.NativeFunction<ffi.Void Function(ffi.Int64)>>(
'wire_subscribe_event_stream');
late final _wire_subscribe_event_stream =
_wire_subscribe_event_streamPtr.asFunction<void Function(int)>();

void wire_shutdown_node(
int port_,
) {
Expand Down
3 changes: 2 additions & 1 deletion packages/p2panda/native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ flutter_rust_bridge_codegen = "1.82.6"
[dependencies]
android_logger = "0.13.1"
anyhow = "1.0.75"
aquadoggo = "0.8.0"
# aquadoggo = "0.8.0"
aquadoggo = { git = "https://github.com/p2panda/aquadoggo.git", branch = "make-node-event-public" }
ed25519-dalek = "1.0.1"
flutter_rust_bridge = "1.82.6"
log = "0.4.19"
Expand Down
26 changes: 19 additions & 7 deletions packages/p2panda/native/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use android_logger::{Config, FilterBuilder};
use anyhow::{anyhow, Result};
use aquadoggo::{AllowList, Configuration};
use ed25519_dalek::SecretKey;
use flutter_rust_bridge::RustOpaque;
use flutter_rust_bridge::{RustOpaque, StreamSink};
use log::LevelFilter;
use p2panda_rs::document::DocumentViewId;
use p2panda_rs::entry;
Expand All @@ -21,6 +21,8 @@ use crate::node::Manager;

static NODE_INSTANCE: OnceCell<Manager> = OnceCell::const_new();

pub(crate) static NODE_EVENTS_SINK: OnceCell<StreamSink<NodeEvent>> = OnceCell::const_new();

pub type HexString = String;

/// Ed25519 key pair for authors to sign p2panda entries with.
Expand Down Expand Up @@ -221,12 +223,12 @@ pub fn start_node(
// Initialise logging for Android developer console
android_logger::init_once(
Config::default()
.with_max_level(LevelFilter::Trace)
.with_filter(
FilterBuilder::new()
.filter(Some("aquadoggo"), LevelFilter::Info)
.build(),
),
.with_max_level(LevelFilter::Info)
// .with_filter(
// FilterBuilder::new()
// .filter(Some("aquadoggo"), LevelFilter::Info)
// .build(),
// ),
);

// Set node configuration
Expand Down Expand Up @@ -266,6 +268,16 @@ pub fn start_node(
Ok(())
}

pub enum NodeEvent {
PeerConnected,
PeerDisconnected,
}

/// Listen to events coming from the aquadoggo node.
pub fn subscribe_event_stream(sink: StreamSink<NodeEvent>) {
let _ = NODE_EVENTS_SINK.set(sink);
}

/// Turns off running node.
pub fn shutdown_node() {
match NODE_INSTANCE.get() {
Expand Down
37 changes: 37 additions & 0 deletions packages/p2panda/native/src/bridge_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,22 @@ fn wire_start_node_impl(
},
)
}
fn wire_subscribe_event_stream_impl(port_: MessagePort) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, (), _>(
WrapInfo {
debug_name: "subscribe_event_stream",
port: Some(port_),
mode: FfiCallMode::Stream,
},
move || {
move |task_callback| {
Result::<_, ()>::Ok(subscribe_event_stream(
task_callback.stream_sink::<_, NodeEvent>(),
))
}
},
)
}
fn wire_shutdown_node_impl(port_: MessagePort) {
FLUTTER_RUST_BRIDGE_HANDLER.wrap::<_, _, _, (), _>(
WrapInfo {
Expand Down Expand Up @@ -287,6 +303,22 @@ impl rust2dart::IntoIntoDart<KeyPair> for KeyPair {
}
}

impl support::IntoDart for NodeEvent {
fn into_dart(self) -> support::DartAbi {
match self {
Self::PeerConnected => 0,
Self::PeerDisconnected => 1,
}
.into_dart()
}
}
impl support::IntoDartExceptPrimitive for NodeEvent {}
impl rust2dart::IntoIntoDart<NodeEvent> for NodeEvent {
fn into_into_dart(self) -> Self {
self
}
}

impl support::IntoDart for OperationAction {
fn into_dart(self) -> support::DartAbi {
match self {
Expand Down Expand Up @@ -378,6 +410,11 @@ mod io {
)
}

#[no_mangle]
pub extern "C" fn wire_subscribe_event_stream(port_: i64) {
wire_subscribe_event_stream_impl(port_)
}

#[no_mangle]
pub extern "C" fn wire_shutdown_node(port_: i64) {
wire_shutdown_node_impl(port_)
Expand Down
25 changes: 25 additions & 0 deletions packages/p2panda/native/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use std::thread;

use anyhow::Result;
use aquadoggo::{Configuration, Node};
use log::info;
use p2panda_rs::identity::KeyPair;
use tokio::runtime;
use tokio::sync::mpsc::{channel, Sender};

use crate::api::{NodeEvent, NODE_EVENTS_SINK};

pub struct Manager {
shutdown_signal: Sender<bool>,
}
Expand All @@ -24,6 +27,28 @@ impl Manager {

rt.block_on(async move {
let node = Node::start(key_pair, config).await;
let mut tx = node.subscribe().await;

tokio::task::spawn(async move {
// Stream node events further into Dart world
loop {
while let Some(event) = tx.recv().await {
let node_event = match event {
aquadoggo::NodeEvent::PeerConnected => NodeEvent::PeerConnected,
aquadoggo::NodeEvent::PeerDisconnected => {
NodeEvent::PeerDisconnected
}
};

info!("{:?}", event);

NODE_EVENTS_SINK.get().map(|sink| {
let sent = sink.add(node_event);
info!("{:?}", sent);
});
}
}
});

tokio::select! {
_ = on_shutdown.recv() => (),
Expand Down