Enable multithreading on sending side (#1983)

This commit is contained in:
Tien Do Nam
2024-10-31 21:07:08 +01:00
committed by GitHub
parent 8633e140e9
commit fa8170d05a
21 changed files with 597 additions and 133 deletions
+8 -1
View File
@@ -11,6 +11,7 @@ import 'package:common/util/dio.dart';
import 'package:common/util/logger.dart';
import 'package:dart_mappable/dart_mappable.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart';
import 'package:flutter/widgets.dart';
import 'package:flutter_displaymode/flutter_displaymode.dart';
import 'package:localsend_app/config/refena.dart';
@@ -23,8 +24,10 @@ import 'package:localsend_app/provider/device_info_provider.dart';
import 'package:localsend_app/provider/network/nearby_devices_provider.dart';
import 'package:localsend_app/provider/network/server/server_provider.dart';
import 'package:localsend_app/provider/persistence_provider.dart';
// [FOSS_REMOVE_START]
import 'package:localsend_app/provider/purchase_provider.dart';
// [FOSS_REMOVE_END]
import 'package:localsend_app/provider/selection/selected_sending_files_provider.dart';
import 'package:localsend_app/provider/settings_provider.dart';
@@ -33,6 +36,7 @@ import 'package:localsend_app/provider/window_dimensions_provider.dart';
import 'package:localsend_app/util/i18n.dart';
import 'package:localsend_app/util/native/autostart_helper.dart';
import 'package:localsend_app/util/native/cache_helper.dart';
import 'package:localsend_app/util/native/content_uri_helper.dart';
import 'package:localsend_app/util/native/context_menu_helper.dart';
import 'package:localsend_app/util/native/cross_file_converters.dart';
import 'package:localsend_app/util/native/device_info_helper.dart';
@@ -141,6 +145,7 @@ Future<RefenaContainer> preInit(List<String> args) async {
return IsolateController(
initialState: ParentIsolateState.initial(
SyncState(
rootIsolateToken: RootIsolateToken.instance!,
securityContext: persistenceService.getSecurityContext(),
deviceInfo: ref.read(deviceInfoProvider),
alias: settings.alias,
@@ -155,7 +160,9 @@ Future<RefenaContainer> preInit(List<String> args) async {
);
}));
await container.redux(parentIsolateProvider).dispatchAsync(IsolateSetupAction());
await container.redux(parentIsolateProvider).dispatchAsync(IsolateSetupAction(
uriContentStreamResolver: AndroidUriContentStreamResolver(),
));
return container;
}
@@ -1,7 +1,6 @@
import 'package:common/model/device.dart';
import 'package:common/model/session_status.dart';
import 'package:dart_mappable/dart_mappable.dart';
import 'package:dio/dio.dart';
import 'package:localsend_app/model/state/send/sending_file.dart';
part 'send_session_state.mapper.dart';
@@ -16,7 +15,7 @@ class SendSessionState with SendSessionStateMappable {
final Map<String, SendingFile> files; // file id as key
final int? startTime;
final int? endTime;
final CancelToken? cancelToken;
final List<SendingTask>? sendingTasks; // used to cancel tasks
final String? errorMessage;
const SendSessionState({
@@ -28,7 +27,7 @@ class SendSessionState with SendSessionStateMappable {
required this.files,
required this.startTime,
required this.endTime,
required this.cancelToken,
required this.sendingTasks,
required this.errorMessage,
});
@@ -37,6 +36,16 @@ class SendSessionState with SendSessionStateMappable {
/// SendingFile.
@override
String toString() {
return 'SendSessionState(sessionId: $sessionId, remoteSessionId: $remoteSessionId, background: $background, status: $status, target: $target, files: $files, startTime: $startTime, endTime: $endTime, cancelToken: $cancelToken, errorMessage: $errorMessage)';
return 'SendSessionState(sessionId: $sessionId, remoteSessionId: $remoteSessionId, background: $background, status: $status, target: $target, files: $files, startTime: $startTime, endTime: $endTime, sendingTasks: $sendingTasks, errorMessage: $errorMessage)';
}
}
class SendingTask {
final int isolateIndex;
final int taskId;
SendingTask({
required this.isolateIndex,
required this.taskId,
});
}
@@ -38,8 +38,8 @@ class SendSessionStateMapper extends ClassMapperBase<SendSessionState> {
static const Field<SendSessionState, int> _f$startTime = Field('startTime', _$startTime);
static int? _$endTime(SendSessionState v) => v.endTime;
static const Field<SendSessionState, int> _f$endTime = Field('endTime', _$endTime);
static CancelToken? _$cancelToken(SendSessionState v) => v.cancelToken;
static const Field<SendSessionState, CancelToken> _f$cancelToken = Field('cancelToken', _$cancelToken);
static List<SendingTask>? _$sendingTasks(SendSessionState v) => v.sendingTasks;
static const Field<SendSessionState, List<SendingTask>> _f$sendingTasks = Field('sendingTasks', _$sendingTasks);
static String? _$errorMessage(SendSessionState v) => v.errorMessage;
static const Field<SendSessionState, String> _f$errorMessage = Field('errorMessage', _$errorMessage);
@@ -53,7 +53,7 @@ class SendSessionStateMapper extends ClassMapperBase<SendSessionState> {
#files: _f$files,
#startTime: _f$startTime,
#endTime: _f$endTime,
#cancelToken: _f$cancelToken,
#sendingTasks: _f$sendingTasks,
#errorMessage: _f$errorMessage,
};
@@ -67,7 +67,7 @@ class SendSessionStateMapper extends ClassMapperBase<SendSessionState> {
files: data.dec(_f$files),
startTime: data.dec(_f$startTime),
endTime: data.dec(_f$endTime),
cancelToken: data.dec(_f$cancelToken),
sendingTasks: data.dec(_f$sendingTasks),
errorMessage: data.dec(_f$errorMessage));
}
@@ -117,6 +117,7 @@ extension SendSessionStateValueCopy<$R, $Out> on ObjectCopyWith<$R, SendSessionS
abstract class SendSessionStateCopyWith<$R, $In extends SendSessionState, $Out> implements ClassCopyWith<$R, $In, $Out> {
DeviceCopyWith<$R, Device, Device> get target;
MapCopyWith<$R, String, SendingFile, SendingFileCopyWith<$R, SendingFile, SendingFile>> get files;
ListCopyWith<$R, SendingTask, ObjectCopyWith<$R, SendingTask, SendingTask>>? get sendingTasks;
$R call(
{String? sessionId,
String? remoteSessionId,
@@ -126,7 +127,7 @@ abstract class SendSessionStateCopyWith<$R, $In extends SendSessionState, $Out>
Map<String, SendingFile>? files,
int? startTime,
int? endTime,
CancelToken? cancelToken,
List<SendingTask>? sendingTasks,
String? errorMessage});
SendSessionStateCopyWith<$R2, $In, $Out2> $chain<$R2, $Out2>(Then<$Out2, $R2> t);
}
@@ -143,6 +144,10 @@ class _SendSessionStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, Send
MapCopyWith<$R, String, SendingFile, SendingFileCopyWith<$R, SendingFile, SendingFile>> get files =>
MapCopyWith($value.files, (v, t) => v.copyWith.$chain(t), (v) => call(files: v));
@override
ListCopyWith<$R, SendingTask, ObjectCopyWith<$R, SendingTask, SendingTask>>? get sendingTasks => $value.sendingTasks != null
? ListCopyWith($value.sendingTasks!, (v, t) => ObjectCopyWith(v, $identity, t), (v) => call(sendingTasks: v))
: null;
@override
$R call(
{String? sessionId,
Object? remoteSessionId = $none,
@@ -152,7 +157,7 @@ class _SendSessionStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, Send
Map<String, SendingFile>? files,
Object? startTime = $none,
Object? endTime = $none,
Object? cancelToken = $none,
Object? sendingTasks = $none,
Object? errorMessage = $none}) =>
$apply(FieldCopyWithData({
if (sessionId != null) #sessionId: sessionId,
@@ -163,7 +168,7 @@ class _SendSessionStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, Send
if (files != null) #files: files,
if (startTime != $none) #startTime: startTime,
if (endTime != $none) #endTime: endTime,
if (cancelToken != $none) #cancelToken: cancelToken,
if (sendingTasks != $none) #sendingTasks: sendingTasks,
if (errorMessage != $none) #errorMessage: errorMessage
}));
@override
@@ -176,7 +181,7 @@ class _SendSessionStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, Send
files: data.get(#files, or: $value.files),
startTime: data.get(#startTime, or: $value.startTime),
endTime: data.get(#endTime, or: $value.endTime),
cancelToken: data.get(#cancelToken, or: $value.cancelToken),
sendingTasks: data.get(#sendingTasks, or: $value.sendingTasks),
errorMessage: data.get(#errorMessage, or: $value.errorMessage));
@override
+1
View File
@@ -392,6 +392,7 @@ class _ProgressPageState extends State<ProgressPage> with Refena {
onPressed: () async {
await ref.notifier(sendProvider).sendFile(
sessionId: widget.sessionId,
isolateIndex: 0,
file: sendSession.files[file.id]!,
isRetry: true,
);
+73 -59
View File
@@ -1,8 +1,9 @@
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io';
import 'package:common/api_route_builder.dart';
import 'package:common/isolate.dart';
import 'package:common/model/device.dart';
import 'package:common/model/dto/file_dto.dart';
import 'package:common/model/dto/info_register_dto.dart';
@@ -27,7 +28,6 @@ import 'package:localsend_app/provider/dio_provider.dart';
import 'package:localsend_app/provider/progress_provider.dart';
import 'package:localsend_app/provider/selection/selected_sending_files_provider.dart';
import 'package:localsend_app/provider/settings_provider.dart';
import 'package:localsend_app/util/stream.dart';
import 'package:localsend_app/widget/dialogs/pin_dialog.dart';
import 'package:logging/logging.dart';
import 'package:refena_flutter/refena_flutter.dart';
@@ -106,7 +106,7 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
}))),
startTime: null,
endTime: null,
cancelToken: cancelToken,
sendingTasks: [],
errorMessage: null,
);
@@ -310,25 +310,40 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
),
);
await _sendLoop(sessionId, target, sendingFiles);
await _sendLoop(ref, sessionId, target, sendingFiles);
}
Future<void> _sendLoop(String sessionId, Device target, Map<String, SendingFile> files) async {
Future<void> _sendLoop(Ref ref, String sessionId, Device target, Map<String, SendingFile> files) async {
state = state.updateSession(
sessionId: sessionId,
state: (s) => s?.copyWith(startTime: DateTime.now().millisecondsSinceEpoch),
);
for (final file in files.values) {
final result = await sendFile(
sessionId: sessionId,
file: file,
isRetry: false,
);
if (!result) {
break;
final queue = Queue<SendingFile>()..addAll(files.values);
final concurrency = ref.read(parentIsolateProvider).uploadIsolateCount;
_logger.info('Sending files using $concurrency concurrent isolates');
final futures = List.generate(concurrency, (index) async {
while (true) {
final file = switch (queue.isEmpty) {
true => null,
false => queue.removeFirst(),
};
if (file == null) {
break;
}
await sendFile(
sessionId: sessionId,
isolateIndex: index,
file: file,
isRetry: false,
);
}
}
});
await Future.wait(futures);
_finish(sessionId: sessionId);
}
@@ -367,6 +382,7 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
/// Returns true, if the next file should be sent.
Future<bool> sendFile({
required String sessionId,
required int isolateIndex,
required SendingFile file,
required bool isRetry,
}) async {
@@ -382,7 +398,6 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
}
final remoteSessionId = state[sessionId]!.remoteSessionId;
final dio = ref.read(dioProvider).longLiving;
final target = state[sessionId]!.target;
if (isRetry) {
@@ -409,48 +424,39 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
state: (s) => s?.withFileStatus(file.file.id, FileStatus.sending, null),
);
final Stream<List<int>>? fileStream = file.path != null
? file.path!.startsWith('content://')
? uriContent.getContentStream(Uri.parse(file.path!))
: File(file.path!).openRead()
: null;
final (streamController, subscription) = fileStream?.digested() ?? (null, null);
final taskResult = ref.redux(parentIsolateProvider).dispatchTakeResult(IsolateHttpUploadAction(
isolateIndex: isolateIndex,
remoteSessionId: remoteSessionId,
remoteFileToken: token,
fileId: file.file.id,
filePath: file.path,
fileBytes: file.bytes,
mime: file.file.lookupMime(),
fileSize: file.file.size,
device: target,
));
String? fileError;
try {
final cancelToken = CancelToken();
state = state.updateSession(
sessionId: sessionId,
state: (s) => s?.copyWith(cancelToken: cancelToken),
);
final stopwatch = Stopwatch()..start();
await dio.post(
ApiRoute.upload.target(target, query: {
if (remoteSessionId != null) 'sessionId': remoteSessionId,
'fileId': file.file.id,
'token': token,
}),
options: Options(
headers: {
'Content-Length': file.file.size,
'Content-Type': file.file.lookupMime(),
},
),
data: streamController?.stream ?? file.bytes!,
onSendProgress: (curr, total) {
if (stopwatch.elapsedMilliseconds >= 100) {
stopwatch.reset();
ref.notifier(progressProvider).setProgress(
sessionId: sessionId,
fileId: file.file.id,
progress: curr / total,
);
}
},
cancelToken: cancelToken,
state: (s) => s?.copyWith(sendingTasks: [
...?s.sendingTasks,
SendingTask(
isolateIndex: isolateIndex,
taskId: taskResult.taskId,
),
]),
);
await for (final progress in taskResult.progress) {
ref.notifier(progressProvider).setProgress(
sessionId: sessionId,
fileId: file.file.id,
progress: progress,
);
}
// set progress to 100% when successfully finished
ref.notifier(progressProvider).setProgress(
sessionId: sessionId,
@@ -461,13 +467,11 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
fileError = e.humanErrorMessage;
_logger.warning('Error while sending file ${file.file.fileName}', e, st);
} finally {
// Close the stream if it is still open
// ignore: unawaited_futures
streamController?.close();
// Cancel the subscription if it is still open
// ignore: unawaited_futures
subscription?.cancel();
state = state.updateSession(
sessionId: sessionId,
state: (s) => s?.copyWith(
sendingTasks: s.sendingTasks?.where((task) => !(task.isolateIndex == isolateIndex && task.taskId == taskResult.taskId)).toList()),
);
}
state = state.updateSession(
@@ -493,7 +497,8 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
return;
}
final remoteSessionId = sessionState.remoteSessionId;
sessionState.cancelToken?.cancel(); // cancel current request
_cancelRunningRequests(sessionState);
// notify the receiver
try {
@@ -515,7 +520,7 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
if (sessionState == null) {
return;
}
sessionState.cancelToken?.cancel(); // cancel current request
_cancelRunningRequests(sessionState);
state = state.updateSession(
sessionId: sessionId,
@@ -526,6 +531,15 @@ class SendNotifier extends Notifier<Map<String, SendSessionState>> {
);
}
void _cancelRunningRequests(SendSessionState state) {
for (final task in state.sendingTasks ?? <SendingTask>[]) {
ref.redux(parentIsolateProvider).dispatch(IsolateHttpUploadCancelAction(
isolateIndex: task.isolateIndex,
taskId: task.taskId,
));
}
}
/// Closes the session
void closeSession(String sessionId) {
final sessionState = state[sessionId];
@@ -8,6 +8,7 @@ import 'package:common/model/dto/file_dto.dart';
import 'package:common/model/dto/info_dto.dart';
import 'package:common/model/dto/receive_request_response_dto.dart';
import 'package:common/model/file_type.dart';
import 'package:common/util/stream.dart';
import 'package:localsend_app/gen/assets.gen.dart';
import 'package:localsend_app/gen/strings.g.dart';
import 'package:localsend_app/model/cross_file.dart';
@@ -19,7 +20,6 @@ import 'package:localsend_app/provider/network/server/controller/common.dart';
import 'package:localsend_app/provider/network/server/server_utils.dart';
import 'package:localsend_app/provider/settings_provider.dart';
import 'package:localsend_app/util/simple_server.dart';
import 'package:localsend_app/util/stream.dart';
import 'package:uri_content/uri_content.dart';
import 'package:uuid/uuid.dart';
@@ -1,3 +1,7 @@
import 'package:common/isolate.dart';
import 'package:flutter/services.dart';
import 'package:uri_content/uri_content.dart';
class ContentUriHelper {
/// Converts
/// content://com.android.externalstorage.documents/tree/primary%3ADocuments
@@ -84,3 +88,18 @@ class ContentUriHelper {
return uri.substring(0, treeIndex + 6) + encodedPath;
}
}
class AndroidUriContentStreamResolver implements UriContentStreamResolver {
UriContent? _uriContent;
@override
void init({required Object? rootIsolateToken}) {
BackgroundIsolateBinaryMessenger.ensureInitialized(rootIsolateToken as RootIsolateToken);
_uriContent = UriContent();
}
@override
Stream<Uint8List> resolve(Uri uri) {
return _uriContent!.getContentStream(uri);
}
}
+1
View File
@@ -1,4 +1,5 @@
export 'package:common/src/isolate/child/sync_provider.dart';
export 'package:common/src/isolate/child/upload_isolate.dart' show UriContentStreamResolver;
export 'package:common/src/isolate/parent/actions.dart';
export 'package:common/src/isolate/parent/actions_sync.dart';
export 'package:common/src/isolate/parent/parent_isolate_provider.dart';
@@ -1,7 +1,7 @@
import 'package:common/model/device.dart';
import 'package:common/src/isolate/child/main.dart';
import 'package:common/src/isolate/dto/isolate_task.dart';
import 'package:common/src/isolate/dto/isolate_task_stream_result.dart';
import 'package:common/src/isolate/dto/isolate_task_result.dart';
import 'package:common/src/isolate/dto/send_to_isolate_data.dart';
import 'package:common/src/task/discovery/http_scan_discovery.dart';
import 'package:meta/meta.dart';
@@ -54,16 +54,13 @@ Future<void> setupHttpScanDiscoveryIsolate(
),
};
await for (final device in stream) {
sendToMain(IsolateTaskStreamResult(
sendToMain(IsolateTaskStreamResult.event(
id: task.id,
done: false,
data: device,
));
}
sendToMain(IsolateTaskStreamResult(
sendToMain(IsolateTaskStreamResult.done(
id: task.id,
done: true,
data: null,
));
},
);
+2 -2
View File
@@ -71,8 +71,8 @@ void _handleMessage<S>(String debugLabel, SendToIsolateData<S> message, Future<v
if (data != null) {
try {
await handler(_isolateContainer, data);
} catch (e) {
_logger.severe('Error in $debugLabel: $e', e);
} catch (e, st) {
_logger.severe('Error in $debugLabel: $e', e, st);
}
}
}
@@ -11,6 +11,7 @@ part 'sync_provider.mapper.dart';
/// In other words, the main isolate sends this state to the child isolate.
@MappableClass()
class SyncState with SyncStateMappable {
final Object rootIsolateToken;
final StoredSecurityContext securityContext;
final DeviceInfoResult deviceInfo;
final String alias;
@@ -23,6 +24,7 @@ class SyncState with SyncStateMappable {
final bool download;
SyncState({
required this.rootIsolateToken,
required this.securityContext,
required this.deviceInfo,
required this.alias,
@@ -22,6 +22,8 @@ class SyncStateMapper extends ClassMapperBase<SyncState> {
@override
final String id = 'SyncState';
static Object _$rootIsolateToken(SyncState v) => v.rootIsolateToken;
static const Field<SyncState, Object> _f$rootIsolateToken = Field('rootIsolateToken', _$rootIsolateToken);
static StoredSecurityContext _$securityContext(SyncState v) => v.securityContext;
static const Field<SyncState, StoredSecurityContext> _f$securityContext = Field('securityContext', _$securityContext);
static DeviceInfoResult _$deviceInfo(SyncState v) => v.deviceInfo;
@@ -43,6 +45,7 @@ class SyncStateMapper extends ClassMapperBase<SyncState> {
@override
final MappableFields<SyncState> fields = const {
#rootIsolateToken: _f$rootIsolateToken,
#securityContext: _f$securityContext,
#deviceInfo: _f$deviceInfo,
#alias: _f$alias,
@@ -56,6 +59,7 @@ class SyncStateMapper extends ClassMapperBase<SyncState> {
static SyncState _instantiate(DecodingData data) {
return SyncState(
rootIsolateToken: data.dec(_f$rootIsolateToken),
securityContext: data.dec(_f$securityContext),
deviceInfo: data.dec(_f$deviceInfo),
alias: data.dec(_f$alias),
@@ -112,7 +116,8 @@ extension SyncStateValueCopy<$R, $Out> on ObjectCopyWith<$R, SyncState, $Out> {
abstract class SyncStateCopyWith<$R, $In extends SyncState, $Out> implements ClassCopyWith<$R, $In, $Out> {
StoredSecurityContextCopyWith<$R, StoredSecurityContext, StoredSecurityContext> get securityContext;
$R call(
{StoredSecurityContext? securityContext,
{Object? rootIsolateToken,
StoredSecurityContext? securityContext,
DeviceInfoResult? deviceInfo,
String? alias,
int? port,
@@ -134,7 +139,8 @@ class _SyncStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, SyncState,
$value.securityContext.copyWith.$chain((v) => call(securityContext: v));
@override
$R call(
{StoredSecurityContext? securityContext,
{Object? rootIsolateToken,
StoredSecurityContext? securityContext,
DeviceInfoResult? deviceInfo,
String? alias,
int? port,
@@ -144,6 +150,7 @@ class _SyncStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, SyncState,
bool? serverRunning,
bool? download}) =>
$apply(FieldCopyWithData({
if (rootIsolateToken != null) #rootIsolateToken: rootIsolateToken,
if (securityContext != null) #securityContext: securityContext,
if (deviceInfo != null) #deviceInfo: deviceInfo,
if (alias != null) #alias: alias,
@@ -156,6 +163,7 @@ class _SyncStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, SyncState,
}));
@override
SyncState $make(CopyWithData data) => SyncState(
rootIsolateToken: data.get(#rootIsolateToken, or: $value.rootIsolateToken),
securityContext: data.get(#securityContext, or: $value.securityContext),
deviceInfo: data.get(#deviceInfo, or: $value.deviceInfo),
alias: data.get(#alias, or: $value.alias),
@@ -0,0 +1,148 @@
import 'dart:io';
import 'dart:typed_data';
import 'package:common/model/device.dart';
import 'package:common/src/isolate/child/main.dart';
import 'package:common/src/isolate/child/sync_provider.dart';
import 'package:common/src/isolate/dto/isolate_task.dart';
import 'package:common/src/isolate/dto/isolate_task_result.dart';
import 'package:common/src/isolate/dto/send_to_isolate_data.dart';
import 'package:common/src/task/upload/http_upload.dart';
import 'package:common/util/stream.dart';
import 'package:dio/dio.dart';
import 'package:meta/meta.dart';
import 'package:refena/refena.dart';
sealed class BaseHttpUploadTask {}
class HttpUploadSetContentStreamResolverTask implements BaseHttpUploadTask {
final UriContentStreamResolver resolver;
HttpUploadSetContentStreamResolverTask({
required this.resolver,
});
}
class HttpUploadTask implements BaseHttpUploadTask {
final String? remoteSessionId;
final String remoteFileToken;
final String fileId;
final String? filePath;
final List<int>? fileBytes;
final String mime;
final int fileSize;
final Device device;
HttpUploadTask({
required this.remoteSessionId,
required this.remoteFileToken,
required this.fileId,
required this.filePath,
required this.fileBytes,
required this.mime,
required this.fileSize,
required this.device,
});
}
class HttpUploadCancelTask implements BaseHttpUploadTask {
final int taskId;
HttpUploadCancelTask({required this.taskId});
}
/// Map of cancel tokens for each task.
/// Task ID -> CancelToken
final _cancelTokenProvider = Provider((ref) => <int, CancelToken>{});
abstract class UriContentStreamResolver {
/// Separate initialization method to create instance in the child isolate.
/// Cannot reference the RootIsolateToken class because it is not part of Dart.
void init({required Object? rootIsolateToken});
/// Resolves the content stream for the given URI.
Stream<Uint8List> resolve(Uri uri);
}
UriContentStreamResolver? _uriContentStreamResolver;
@internal
Future<void> setupHttpUploadIsolate(
Stream<SendToIsolateData<IsolateTask<BaseHttpUploadTask>>> receiveFromMain,
void Function(IsolateTaskStreamResult<double>) sendToMain,
InitialData initialData,
) async {
await setupChildIsolateHelper(
debugLabel: 'HttpUploadIsolate',
receiveFromMain: receiveFromMain,
sendToMain: sendToMain,
initialData: initialData,
handler: (ref, task) async {
final HttpUploadTask uploadTask;
switch (task.data) {
case HttpUploadSetContentStreamResolverTask task:
final rootIsolateToken = ref.read(syncProvider).rootIsolateToken;
task.resolver.init(
rootIsolateToken: rootIsolateToken,
);
_uriContentStreamResolver = task.resolver;
return;
case HttpUploadTask task:
uploadTask = task;
break;
case HttpUploadCancelTask task:
final cancelToken = ref.read(_cancelTokenProvider)[task.taskId];
cancelToken?.cancel();
ref.read(_cancelTokenProvider).remove(task.taskId);
return;
}
final Stream<List<int>>? fileStream = uploadTask.filePath != null
? _uriContentStreamResolver != null && uploadTask.filePath!.startsWith('content://')
? _uriContentStreamResolver!.resolve(Uri.parse(uploadTask.filePath!))
: File(uploadTask.filePath!).openRead()
: null;
final (streamController, subscription) = fileStream?.digested() ?? (null, null);
try {
final cancelToken = CancelToken();
ref.read(_cancelTokenProvider).putIfAbsent(task.id, () => cancelToken);
await ref.read(httpUploadProvider).upload(
stream: streamController?.stream ?? Stream.fromIterable([uploadTask.fileBytes!]),
contentLength: uploadTask.fileSize,
contentType: uploadTask.mime,
target: uploadTask.device,
remoteSessionId: uploadTask.remoteSessionId,
fileId: uploadTask.fileId,
token: uploadTask.remoteFileToken,
onSendProgress: (progress) {
sendToMain(IsolateTaskStreamResult.event(
id: task.id,
data: progress,
));
},
cancelToken: cancelToken,
);
sendToMain(IsolateTaskStreamResult.done(
id: task.id,
));
} catch (e) {
sendToMain(IsolateTaskStreamResult.error(
id: task.id,
error: e.toString(),
));
} finally {
// Close the stream if it is still open
// ignore: unawaited_futures
streamController?.close();
// Cancel the subscription if it is still open
// ignore: unawaited_futures
subscription?.cancel();
}
},
);
}
@@ -1,5 +1,4 @@
import 'package:common/src/isolate/dto/isolate_task_result.dart';
import 'package:common/src/isolate/dto/isolate_task_stream_result.dart';
/// A data structure that can be sent to an isolate.
/// This is used to represent the following schemas:
@@ -13,3 +13,55 @@ class IsolateTaskResult<T> {
required this.data,
});
}
/// Stream version of [IsolateTaskResult].
class IsolateTaskStreamResult<T> {
/// The id of the task.
/// Corresponds to [IsolateTask.id] that started the stream.
final int id;
/// If true, the stream is done.
final bool done;
/// The error.
final Object? error;
/// A single data event from the stream.
final T? data;
IsolateTaskStreamResult._({
required this.id,
required this.done,
required this.data,
required this.error,
});
IsolateTaskStreamResult.event({
required this.id,
required this.data,
}) : done = false,
error = null;
IsolateTaskStreamResult.done({
required this.id,
}) : done = true,
data = null,
error = null;
IsolateTaskStreamResult.error({
required this.id,
required this.error,
}) : done = true,
data = null;
}
/// A special data payload to acknowledge the reception of a stream event.
class IsolateTaskStreamAckResult<T> extends IsolateTaskStreamResult<T> {
IsolateTaskStreamAckResult({
required super.id,
}) : super._(
data: null,
done: false,
error: null,
);
}
@@ -1,21 +0,0 @@
import 'package:common/src/isolate/dto/isolate_task.dart';
import 'package:common/src/isolate/dto/isolate_task_result.dart';
/// Stream version of [IsolateTaskResult].
class IsolateTaskStreamResult<T> {
/// The id of the task.
/// Corresponds to [IsolateTask.id] that started the stream.
final int id;
/// If true, the stream is done.
final bool done;
/// A single data event from the stream.
final T? data;
IsolateTaskStreamResult({
required this.id,
required this.done,
required this.data,
});
}
+127 -22
View File
@@ -4,8 +4,9 @@ import 'package:common/model/device.dart';
import 'package:common/src/isolate/child/http_scan_discovery_isolate.dart';
import 'package:common/src/isolate/child/http_target_discovery_isolate.dart';
import 'package:common/src/isolate/child/multicast_discovery_isolate.dart';
import 'package:common/src/isolate/child/upload_isolate.dart';
import 'package:common/src/isolate/dto/isolate_task.dart';
import 'package:common/src/isolate/dto/isolate_task_stream_result.dart';
import 'package:common/src/isolate/dto/isolate_task_result.dart';
import 'package:common/src/isolate/dto/send_to_isolate_data.dart';
import 'package:common/src/isolate/parent/parent_isolate_provider.dart';
import 'package:common/src/util/id_provider.dart';
@@ -77,13 +78,10 @@ class IsolateInterfaceHttpDiscoveryAction extends ReduxActionWithResult<IsolateC
throw StateError('httpScanDiscovery is not initialized');
}
final task = IsolateTask(
id: _idProvider.getNextId(),
data: HttpInterfaceScanTask(
networkInterface: networkInterface,
port: port,
https: https,
),
final task = HttpInterfaceScanTask(
networkInterface: networkInterface,
port: port,
https: https,
);
return (
@@ -112,12 +110,9 @@ class IsolateFavoriteHttpDiscoveryAction extends ReduxActionWithResult<IsolateCo
throw StateError('httpScanDiscovery is not initialized');
}
final task = IsolateTask(
id: _idProvider.getNextId(),
data: HttpFavoriteScanTask(
favorites: favorites,
https: https,
),
final task = HttpFavoriteScanTask(
favorites: favorites,
https: https,
);
return (
@@ -149,29 +144,139 @@ class IsolateSendMulticastAnnouncementAction extends ReduxAction<IsolateControll
}
}
/// Sends the task to the isolate
class IsolateHttpUploadActionResult {
final int taskId;
final Stream<double> progress;
IsolateHttpUploadActionResult({
required this.taskId,
required this.progress,
});
}
class IsolateHttpUploadAction extends ReduxActionWithResult<IsolateController, ParentIsolateState, IsolateHttpUploadActionResult> {
final int isolateIndex;
final String? remoteSessionId;
final String remoteFileToken;
final String fileId;
final String? filePath;
final List<int>? fileBytes;
final String mime;
final int fileSize;
final Device device;
IsolateHttpUploadAction({
required this.isolateIndex,
required this.remoteSessionId,
required this.remoteFileToken,
required this.fileId,
required this.filePath,
required this.fileBytes,
required this.mime,
required this.fileSize,
required this.device,
});
@override
(ParentIsolateState, IsolateHttpUploadActionResult) reduce() {
final connection = state.httpUpload[isolateIndex];
final task = HttpUploadTask(
remoteSessionId: remoteSessionId,
remoteFileToken: remoteFileToken,
fileId: fileId,
filePath: filePath,
fileBytes: fileBytes,
mime: mime,
fileSize: fileSize,
device: device,
);
final taskId = _idProvider.getNextId();
final progress = _sendTaskAndListenStream(
task: task,
connection: connection,
);
return (
state,
IsolateHttpUploadActionResult(
taskId: taskId,
progress: progress,
),
);
}
}
class IsolateHttpUploadCancelAction extends ReduxAction<IsolateController, ParentIsolateState> {
final int isolateIndex;
final int taskId;
IsolateHttpUploadCancelAction({
required this.isolateIndex,
required this.taskId,
});
@override
ParentIsolateState reduce() {
final connection = state.httpUpload[isolateIndex];
connection.sendToIsolate(SendToIsolateData(
syncState: null,
data: IsolateTask(
id: _idProvider.getNextId(),
data: HttpUploadCancelTask(
taskId: taskId,
),
),
));
return state;
}
}
/// Sends a task to the isolate
/// and transforms [IsolateTaskStreamResult] into a proper stream making it easier to work with.
Stream<R> _sendTaskAndListenStream<R, T>({
required IsolateTask<T> task,
required T task,
required IsolateConnector<IsolateTaskStreamResult<R>, SendToIsolateData<IsolateTask<T>>> connection,
}) {
final wrappedTask = IsolateTask(
id: _idProvider.getNextId(),
data: task,
);
// ignore: unawaited_futures
Future.microtask(() {
connection.sendToIsolate(SendToIsolateData<IsolateTask<T>>(
syncState: null,
data: task,
data: wrappedTask,
));
});
return _convertResponseToStream<R, T>(
taskId: wrappedTask.id,
connection: connection,
);
}
Stream<R> _convertResponseToStream<R, T>({
required int taskId,
required IsolateConnector<IsolateTaskStreamResult<R>, SendToIsolateData<IsolateTask<T>>> connection,
}) {
final controller = StreamController<R>();
late StreamSubscription subscription;
subscription = connection.receiveFromIsolate.listen((result) {
if (result.id == task.id) {
if (result.done) {
subscription.cancel(); // ignore: discarded_futures
controller.close(); // ignore: discarded_futures
} else {
if (result.id == taskId) {
if (result.data != null) {
controller.add(result.data as R);
} else if (result.done) {
if (result.error != null) {
controller.addError(result.error!);
} else {
subscription.cancel(); // ignore: discarded_futures
controller.close(); // ignore: discarded_futures
}
}
}
});
@@ -4,9 +4,9 @@ import 'package:common/src/isolate/child/http_target_discovery_isolate.dart';
import 'package:common/src/isolate/child/main.dart';
import 'package:common/src/isolate/child/multicast_discovery_isolate.dart';
import 'package:common/src/isolate/child/sync_provider.dart';
import 'package:common/src/isolate/child/upload_isolate.dart';
import 'package:common/src/isolate/dto/isolate_task.dart';
import 'package:common/src/isolate/dto/isolate_task_result.dart';
import 'package:common/src/isolate/dto/isolate_task_stream_result.dart';
import 'package:common/src/isolate/dto/send_to_isolate_data.dart';
import 'package:common/src/util/isolate_helper.dart';
import 'package:dart_mappable/dart_mappable.dart';
@@ -15,6 +15,8 @@ import 'package:refena/refena.dart';
part 'parent_isolate_provider.mapper.dart';
const _uploadIsolateCount = 2;
/// Holds the state of the parent isolate that is visible in the main Flutter isolate.
/// The [ParentIsolateState.syncState] is synchronized with all child isolates.
/// Additionally, holds the objects to communicate with the child isolates.
@@ -24,12 +26,15 @@ class ParentIsolateState with ParentIsolateStateMappable {
final IsolateConnector<IsolateTaskStreamResult<Device>, SendToIsolateData<IsolateTask<HttpScanTask>>>? httpScanDiscovery;
final IsolateConnector<IsolateTaskResult<Device?>, SendToIsolateData<IsolateTask<HttpTargetTask>>>? httpTargetDiscovery;
final IsolateConnector<Device, SendToIsolateData<MulticastAnnouncementTask>>? multicastDiscovery;
final List<IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>> httpUpload;
int get uploadIsolateCount => httpUpload.length;
ParentIsolateState({
required this.syncState,
required this.httpScanDiscovery,
required this.httpTargetDiscovery,
required this.multicastDiscovery,
required this.httpUpload,
});
static ParentIsolateState initial(SyncState syncState) => ParentIsolateState(
@@ -37,6 +42,7 @@ class ParentIsolateState with ParentIsolateStateMappable {
httpScanDiscovery: null,
httpTargetDiscovery: null,
multicastDiscovery: null,
httpUpload: [],
);
@override
@@ -63,6 +69,13 @@ class IsolateController extends ReduxNotifier<ParentIsolateState> {
/// Starts the required isolates.
/// Should be called by the main isolate.
class IsolateSetupAction extends AsyncReduxAction<IsolateController, ParentIsolateState> {
/// If provided, file paths starting with "content://" will be resolved using this resolver.
final UriContentStreamResolver? uriContentStreamResolver;
IsolateSetupAction({
required this.uriContentStreamResolver,
});
@override
Future<ParentIsolateState> reduce() async {
final httpScanDiscovery = await startIsolate<IsolateTaskStreamResult<Device>, SendToIsolateData<IsolateTask<HttpScanTask>>, InitialData>(
@@ -89,10 +102,37 @@ class IsolateSetupAction extends AsyncReduxAction<IsolateController, ParentIsola
),
);
final httpUploadIsolates = List.generate(
_uploadIsolateCount,
(index) async {
final httpUpload = await startIsolate<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>, InitialData>(
task: setupHttpUploadIsolate,
param: InitialData(
syncState: state.syncState,
logLevel: Logger.root.level,
),
);
if (uriContentStreamResolver != null) {
httpUpload.sendToIsolate(SendToIsolateData(
syncState: null,
data: IsolateTask(
id: -1,
data: HttpUploadSetContentStreamResolverTask(resolver: uriContentStreamResolver!),
),
));
}
return httpUpload;
},
growable: false,
);
return state.copyWith(
httpScanDiscovery: httpScanDiscovery,
httpTargetDiscovery: httpTargetDiscovery,
multicastDiscovery: multicastDiscovery,
httpUpload: await Future.wait(httpUploadIsolates),
);
}
}
@@ -35,6 +35,11 @@ class ParentIsolateStateMapper extends ClassMapperBase<ParentIsolateState> {
static IsolateConnector<Device, SendToIsolateData<MulticastAnnouncementTask>>? _$multicastDiscovery(ParentIsolateState v) => v.multicastDiscovery;
static const Field<ParentIsolateState, IsolateConnector<Device, SendToIsolateData<MulticastAnnouncementTask>>> _f$multicastDiscovery =
Field('multicastDiscovery', _$multicastDiscovery);
static List<IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>> _$httpUpload(
ParentIsolateState v) =>
v.httpUpload;
static const Field<ParentIsolateState, List<IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>>>
_f$httpUpload = Field('httpUpload', _$httpUpload);
@override
final MappableFields<ParentIsolateState> fields = const {
@@ -42,6 +47,7 @@ class ParentIsolateStateMapper extends ClassMapperBase<ParentIsolateState> {
#httpScanDiscovery: _f$httpScanDiscovery,
#httpTargetDiscovery: _f$httpTargetDiscovery,
#multicastDiscovery: _f$multicastDiscovery,
#httpUpload: _f$httpUpload,
};
static ParentIsolateState _instantiate(DecodingData data) {
@@ -49,7 +55,8 @@ class ParentIsolateStateMapper extends ClassMapperBase<ParentIsolateState> {
syncState: data.dec(_f$syncState),
httpScanDiscovery: data.dec(_f$httpScanDiscovery),
httpTargetDiscovery: data.dec(_f$httpTargetDiscovery),
multicastDiscovery: data.dec(_f$multicastDiscovery));
multicastDiscovery: data.dec(_f$multicastDiscovery),
httpUpload: data.dec(_f$httpUpload));
}
@override
@@ -98,11 +105,17 @@ extension ParentIsolateStateValueCopy<$R, $Out> on ObjectCopyWith<$R, ParentIsol
abstract class ParentIsolateStateCopyWith<$R, $In extends ParentIsolateState, $Out> implements ClassCopyWith<$R, $In, $Out> {
SyncStateCopyWith<$R, SyncState, SyncState> get syncState;
ListCopyWith<
$R,
IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>,
ObjectCopyWith<$R, IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>,
IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>>> get httpUpload;
$R call(
{SyncState? syncState,
IsolateConnector<IsolateTaskStreamResult<Device>, SendToIsolateData<IsolateTask<HttpScanTask>>>? httpScanDiscovery,
IsolateConnector<IsolateTaskResult<Device?>, SendToIsolateData<IsolateTask<HttpTargetTask>>>? httpTargetDiscovery,
IsolateConnector<Device, SendToIsolateData<MulticastAnnouncementTask>>? multicastDiscovery});
IsolateConnector<Device, SendToIsolateData<MulticastAnnouncementTask>>? multicastDiscovery,
List<IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>>? httpUpload});
ParentIsolateStateCopyWith<$R2, $In, $Out2> $chain<$R2, $Out2>(Then<$Out2, $R2> t);
}
@@ -115,19 +128,33 @@ class _ParentIsolateStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, Pa
@override
SyncStateCopyWith<$R, SyncState, SyncState> get syncState => $value.syncState.copyWith.$chain((v) => call(syncState: v));
@override
$R call({SyncState? syncState, Object? httpScanDiscovery = $none, Object? httpTargetDiscovery = $none, Object? multicastDiscovery = $none}) =>
ListCopyWith<
$R,
IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>,
ObjectCopyWith<$R, IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>,
IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>>> get httpUpload =>
ListCopyWith($value.httpUpload, (v, t) => ObjectCopyWith(v, $identity, t), (v) => call(httpUpload: v));
@override
$R call(
{SyncState? syncState,
Object? httpScanDiscovery = $none,
Object? httpTargetDiscovery = $none,
Object? multicastDiscovery = $none,
List<IsolateConnector<IsolateTaskStreamResult<double>, SendToIsolateData<IsolateTask<BaseHttpUploadTask>>>>? httpUpload}) =>
$apply(FieldCopyWithData({
if (syncState != null) #syncState: syncState,
if (httpScanDiscovery != $none) #httpScanDiscovery: httpScanDiscovery,
if (httpTargetDiscovery != $none) #httpTargetDiscovery: httpTargetDiscovery,
if (multicastDiscovery != $none) #multicastDiscovery: multicastDiscovery
if (multicastDiscovery != $none) #multicastDiscovery: multicastDiscovery,
if (httpUpload != null) #httpUpload: httpUpload
}));
@override
ParentIsolateState $make(CopyWithData data) => ParentIsolateState(
syncState: data.get(#syncState, or: $value.syncState),
httpScanDiscovery: data.get(#httpScanDiscovery, or: $value.httpScanDiscovery),
httpTargetDiscovery: data.get(#httpTargetDiscovery, or: $value.httpTargetDiscovery),
multicastDiscovery: data.get(#multicastDiscovery, or: $value.multicastDiscovery));
multicastDiscovery: data.get(#multicastDiscovery, or: $value.multicastDiscovery),
httpUpload: data.get(#httpUpload, or: $value.httpUpload));
@override
ParentIsolateStateCopyWith<$R2, ParentIsolateState, $Out2> $chain<$R2, $Out2>(Then<$Out2, $R2> t) =>
@@ -0,0 +1,51 @@
import 'package:common/api_route_builder.dart';
import 'package:common/model/device.dart';
import 'package:common/src/isolate/child/dio_provider.dart';
import 'package:dio/dio.dart';
import 'package:refena/refena.dart';
final httpUploadProvider = ViewProvider((ref) {
final dio = ref.watch(dioProvider).longLiving;
return HttpUploadService(dio);
});
class HttpUploadService {
final Dio _dio;
HttpUploadService(this._dio);
Future<void> upload({
required Stream<List<int>> stream,
required int contentLength,
required String contentType,
required Device target,
required String? remoteSessionId,
required String fileId,
required String token,
required void Function(double) onSendProgress,
required CancelToken cancelToken,
}) async {
final stopwatch = Stopwatch()..start();
await _dio.post(
ApiRoute.upload.target(target, query: {
if (remoteSessionId != null) 'sessionId': remoteSessionId,
'fileId': fileId,
'token': token,
}),
options: Options(
headers: {
'Content-Length': contentLength,
'Content-Type': contentType,
},
),
data: stream,
onSendProgress: (curr, total) {
if (stopwatch.elapsedMilliseconds >= 100) {
stopwatch.reset();
onSendProgress(curr / total);
}
},
cancelToken: cancelToken,
);
}
}