feat: use rhttp (#1988)

This commit is contained in:
Tien Do Nam
2024-11-03 00:33:22 +01:00
committed by GitHub
parent 74ae622ff2
commit f56d6bde86
17 changed files with 210 additions and 33 deletions
+1
View File
@@ -1,3 +1,4 @@
export 'package:common/src/isolate/child/http_provider.dart' show CustomHttpClient, CustomCancelToken;
export 'package:common/src/isolate/child/sync_provider.dart';
export 'package:common/src/isolate/child/upload_isolate.dart' show UriContentStreamResolver;
export 'package:common/src/isolate/parent/actions.dart';
@@ -0,0 +1,45 @@
import 'package:common/src/isolate/child/sync_provider.dart';
import 'package:refena/refena.dart';
/// An abstraction to provide a custom http client.
abstract class CustomHttpClient {
Future<void> postStream({
required String uri,
required Map<String, String> query,
required Map<String, String> headers,
required Stream<List<int>> stream,
required void Function(double) onSendProgress,
required CustomCancelToken cancelToken,
});
}
class CustomCancelToken {
void Function()? _cancel;
void cancel() {
_cancel?.call();
}
void setCancel(void Function() cancel) {
_cancel = cancel;
}
}
class HttpClientCollection {
final CustomHttpClient discovery;
final CustomHttpClient longLiving;
HttpClientCollection({
required this.discovery,
required this.longLiving,
});
}
final httpProvider = ViewProvider((ref) {
final (clientFactory, securityContext, discoveryTimeout) =
ref.watch(syncProvider.select((state) => (state.httpClientFactory, state.securityContext, state.discoveryTimeout)));
return HttpClientCollection(
discovery: clientFactory(Duration(milliseconds: discoveryTimeout), securityContext),
longLiving: clientFactory(const Duration(days: 30), securityContext),
);
});
+2
View File
@@ -49,6 +49,8 @@ Future<void> setupChildIsolateHelper<S, R>({
),
);
await initialData.syncState.init();
if (init != null) {
await init(_isolateContainer);
}
@@ -1,6 +1,7 @@
import 'package:common/model/device_info_result.dart';
import 'package:common/model/dto/multicast_dto.dart';
import 'package:common/model/stored_security_context.dart';
import 'package:common/src/isolate/child/http_provider.dart';
import 'package:dart_mappable/dart_mappable.dart';
import 'package:meta/meta.dart';
import 'package:refena/refena.dart';
@@ -11,7 +12,9 @@ part 'sync_provider.mapper.dart';
/// In other words, the main isolate sends this state to the child isolate.
@MappableClass()
class SyncState with SyncStateMappable {
final Future<void> Function() init;
final Object rootIsolateToken;
final CustomHttpClient Function(Duration timeout, StoredSecurityContext) httpClientFactory;
final StoredSecurityContext securityContext;
final DeviceInfoResult deviceInfo;
final String alias;
@@ -24,7 +27,9 @@ class SyncState with SyncStateMappable {
final bool download;
SyncState({
required this.init,
required this.rootIsolateToken,
required this.httpClientFactory,
required this.securityContext,
required this.deviceInfo,
required this.alias,
@@ -22,8 +22,14 @@ class SyncStateMapper extends ClassMapperBase<SyncState> {
@override
final String id = 'SyncState';
static Function _$init(SyncState v) => (v as dynamic).init as Function;
static dynamic _arg$init(f) => f<Future<void> Function()>();
static const Field<SyncState, Function> _f$init = Field('init', _$init, arg: _arg$init);
static Object _$rootIsolateToken(SyncState v) => v.rootIsolateToken;
static const Field<SyncState, Object> _f$rootIsolateToken = Field('rootIsolateToken', _$rootIsolateToken);
static Function _$httpClientFactory(SyncState v) => (v as dynamic).httpClientFactory as Function;
static dynamic _arg$httpClientFactory(f) => f<CustomHttpClient Function(Duration, StoredSecurityContext)>();
static const Field<SyncState, Function> _f$httpClientFactory = Field('httpClientFactory', _$httpClientFactory, arg: _arg$httpClientFactory);
static StoredSecurityContext _$securityContext(SyncState v) => v.securityContext;
static const Field<SyncState, StoredSecurityContext> _f$securityContext = Field('securityContext', _$securityContext);
static DeviceInfoResult _$deviceInfo(SyncState v) => v.deviceInfo;
@@ -45,7 +51,9 @@ class SyncStateMapper extends ClassMapperBase<SyncState> {
@override
final MappableFields<SyncState> fields = const {
#init: _f$init,
#rootIsolateToken: _f$rootIsolateToken,
#httpClientFactory: _f$httpClientFactory,
#securityContext: _f$securityContext,
#deviceInfo: _f$deviceInfo,
#alias: _f$alias,
@@ -59,7 +67,9 @@ class SyncStateMapper extends ClassMapperBase<SyncState> {
static SyncState _instantiate(DecodingData data) {
return SyncState(
init: data.dec(_f$init),
rootIsolateToken: data.dec(_f$rootIsolateToken),
httpClientFactory: data.dec(_f$httpClientFactory),
securityContext: data.dec(_f$securityContext),
deviceInfo: data.dec(_f$deviceInfo),
alias: data.dec(_f$alias),
@@ -116,7 +126,9 @@ extension SyncStateValueCopy<$R, $Out> on ObjectCopyWith<$R, SyncState, $Out> {
abstract class SyncStateCopyWith<$R, $In extends SyncState, $Out> implements ClassCopyWith<$R, $In, $Out> {
StoredSecurityContextCopyWith<$R, StoredSecurityContext, StoredSecurityContext> get securityContext;
$R call(
{Object? rootIsolateToken,
{Future<void> Function()? init,
Object? rootIsolateToken,
CustomHttpClient Function(Duration, StoredSecurityContext)? httpClientFactory,
StoredSecurityContext? securityContext,
DeviceInfoResult? deviceInfo,
String? alias,
@@ -139,7 +151,9 @@ class _SyncStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, SyncState,
$value.securityContext.copyWith.$chain((v) => call(securityContext: v));
@override
$R call(
{Object? rootIsolateToken,
{Future<void> Function()? init,
Object? rootIsolateToken,
CustomHttpClient Function(Duration, StoredSecurityContext)? httpClientFactory,
StoredSecurityContext? securityContext,
DeviceInfoResult? deviceInfo,
String? alias,
@@ -150,7 +164,9 @@ class _SyncStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, SyncState,
bool? serverRunning,
bool? download}) =>
$apply(FieldCopyWithData({
if (init != null) #init: init,
if (rootIsolateToken != null) #rootIsolateToken: rootIsolateToken,
if (httpClientFactory != null) #httpClientFactory: httpClientFactory,
if (securityContext != null) #securityContext: securityContext,
if (deviceInfo != null) #deviceInfo: deviceInfo,
if (alias != null) #alias: alias,
@@ -163,7 +179,9 @@ class _SyncStateCopyWithImpl<$R, $Out> extends ClassCopyWithBase<$R, SyncState,
}));
@override
SyncState $make(CopyWithData data) => SyncState(
init: data.get(#init, or: $value.init),
rootIsolateToken: data.get(#rootIsolateToken, or: $value.rootIsolateToken),
httpClientFactory: data.get(#httpClientFactory, or: $value.httpClientFactory),
securityContext: data.get(#securityContext, or: $value.securityContext),
deviceInfo: data.get(#deviceInfo, or: $value.deviceInfo),
alias: data.get(#alias, or: $value.alias),
@@ -1,15 +1,14 @@
import 'dart:io';
import 'dart:typed_data';
import 'package:common/isolate.dart';
import 'package:common/model/device.dart';
import 'package:common/src/isolate/child/main.dart';
import 'package:common/src/isolate/child/sync_provider.dart';
import 'package:common/src/isolate/dto/isolate_task.dart';
import 'package:common/src/isolate/dto/isolate_task_result.dart';
import 'package:common/src/isolate/dto/send_to_isolate_data.dart';
import 'package:common/src/task/upload/http_upload.dart';
import 'package:common/util/stream.dart';
import 'package:dio/dio.dart';
import 'package:meta/meta.dart';
import 'package:refena/refena.dart';
@@ -53,7 +52,7 @@ class HttpUploadCancelTask implements BaseHttpUploadTask {
/// Map of cancel tokens for each task.
/// Task ID -> CancelToken
final _cancelTokenProvider = Provider((ref) => <int, CancelToken>{});
final _cancelTokenProvider = Provider((ref) => <int, CustomCancelToken>{});
abstract class UriContentStreamResolver {
/// Separate initialization method to create instance in the child isolate.
@@ -106,7 +105,7 @@ Future<void> setupHttpUploadIsolate(
final (streamController, subscription) = fileStream?.digested() ?? (null, null);
try {
final cancelToken = CancelToken();
final cancelToken = CustomCancelToken();
ref.read(_cancelTokenProvider).putIfAbsent(task.id, () => cancelToken);
await ref.read(httpUploadProvider).upload(
+3 -1
View File
@@ -196,6 +196,7 @@ class IsolateHttpUploadAction extends ReduxActionWithResult<IsolateController, P
final progress = _sendTaskAndListenStream(
task: task,
connection: connection,
taskId: taskId,
);
return (
@@ -240,9 +241,10 @@ class IsolateHttpUploadCancelAction extends ReduxAction<IsolateController, Paren
Stream<R> _sendTaskAndListenStream<R, T>({
required T task,
required IsolateConnector<IsolateTaskStreamResult<R>, SendToIsolateData<IsolateTask<T>>> connection,
int? taskId,
}) {
final wrappedTask = IsolateTask(
id: _idProvider.getNextId(),
id: taskId ?? _idProvider.getNextId(),
data: task,
);
+15 -23
View File
@@ -1,18 +1,17 @@
import 'package:common/api_route_builder.dart';
import 'package:common/model/device.dart';
import 'package:common/src/isolate/child/dio_provider.dart';
import 'package:dio/dio.dart';
import 'package:common/src/isolate/child/http_provider.dart';
import 'package:refena/refena.dart';
final httpUploadProvider = ViewProvider((ref) {
final dio = ref.watch(dioProvider).longLiving;
return HttpUploadService(dio);
final client = ref.watch(httpProvider).longLiving;
return HttpUploadService(client);
});
class HttpUploadService {
final Dio _dio;
final CustomHttpClient _client;
HttpUploadService(this._dio);
HttpUploadService(this._client);
Future<void> upload({
required Stream<List<int>> stream,
@@ -23,28 +22,21 @@ class HttpUploadService {
required String fileId,
required String token,
required void Function(double) onSendProgress,
required CancelToken cancelToken,
required CustomCancelToken cancelToken,
}) async {
final stopwatch = Stopwatch()..start();
await _dio.post(
ApiRoute.upload.target(target, query: {
await _client.postStream(
uri: ApiRoute.upload.target(target),
query: {
if (remoteSessionId != null) 'sessionId': remoteSessionId,
'fileId': fileId,
'token': token,
}),
options: Options(
headers: {
'Content-Length': contentLength,
'Content-Type': contentType,
},
),
data: stream,
onSendProgress: (curr, total) {
if (stopwatch.elapsedMilliseconds >= 100) {
stopwatch.reset();
onSendProgress(curr / total);
}
},
headers: {
'Content-Length': contentLength.toString(),
'Content-Type': contentType,
},
stream: stream,
onSendProgress: onSendProgress,
cancelToken: cancelToken,
);
}