Line data Source code
1 : /*
2 : * Famedly Matrix SDK
3 : * Copyright (C) 2019, 2020, 2021 Famedly GmbH
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU Affero General Public License as
7 : * published by the Free Software Foundation, either version 3 of the
8 : * License, or (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU Affero General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU Affero General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : import 'dart:async';
20 : import 'dart:convert';
21 :
22 : import 'package:collection/collection.dart';
23 :
24 : import 'package:matrix/matrix.dart';
25 : import 'package:matrix/src/models/timeline_chunk.dart';
26 :
27 : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
28 : /// automatically. The initial
29 : /// event list will be retreived when created by the `room.getTimeline()` method.
30 :
31 : class Timeline {
32 : final Room room;
33 24 : List<Event> get events => chunk.events;
34 :
35 : /// Map of event ID to map of type to set of aggregated events
36 : final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
37 :
38 : final void Function()? onUpdate;
39 : final void Function(int index)? onChange;
40 : final void Function(int index)? onInsert;
41 : final void Function(int index)? onRemove;
42 : final void Function()? onNewEvent;
43 :
44 : StreamSubscription<EventUpdate>? sub;
45 : StreamSubscription<SyncUpdate>? roomSub;
46 : StreamSubscription<String>? sessionIdReceivedSub;
47 : StreamSubscription<String>? cancelSendEventSub;
48 : bool isRequestingHistory = false;
49 : bool isRequestingFuture = false;
50 :
51 : bool allowNewEvent = true;
52 : bool isFragmentedTimeline = false;
53 :
54 : final Map<String, Event> _eventCache = {};
55 :
56 : TimelineChunk chunk;
57 :
58 : /// Searches for the event in this timeline. If not
59 : /// found, requests from the server. Requested events
60 : /// are cached.
61 2 : Future<Event?> getEventById(String id) async {
62 4 : for (final event in events) {
63 4 : if (event.eventId == id) return event;
64 : }
65 4 : if (_eventCache.containsKey(id)) return _eventCache[id];
66 4 : final requestedEvent = await room.getEventById(id);
67 : if (requestedEvent == null) return null;
68 4 : _eventCache[id] = requestedEvent;
69 4 : return _eventCache[id];
70 : }
71 :
72 : // When fetching history, we will collect them into the `_historyUpdates` set
73 : // first, and then only process all events at once, once we have the full history.
74 : // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
75 : // even if /sync's complete while history is being proccessed.
76 : bool _collectHistoryUpdates = false;
77 :
78 : // We confirmed, that there are no more events to load from the database.
79 : bool _fetchedAllDatabaseEvents = false;
80 :
81 1 : bool get canRequestHistory {
82 2 : if (events.isEmpty) return true;
83 0 : return !_fetchedAllDatabaseEvents ||
84 0 : (room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
85 : }
86 :
87 2 : Future<void> requestHistory(
88 : {int historyCount = Room.defaultHistoryCount}) async {
89 2 : if (isRequestingHistory) {
90 : return;
91 : }
92 :
93 2 : isRequestingHistory = true;
94 2 : await _requestEvents(direction: Direction.b, historyCount: historyCount);
95 2 : isRequestingHistory = false;
96 : }
97 :
98 0 : bool get canRequestFuture => !allowNewEvent;
99 :
100 1 : Future<void> requestFuture(
101 : {int historyCount = Room.defaultHistoryCount}) async {
102 1 : if (allowNewEvent) {
103 : return; // we shouldn't force to add new events if they will autatically be added
104 : }
105 :
106 1 : if (isRequestingFuture) return;
107 1 : isRequestingFuture = true;
108 1 : await _requestEvents(direction: Direction.f, historyCount: historyCount);
109 1 : isRequestingFuture = false;
110 : }
111 :
112 3 : Future<void> _requestEvents(
113 : {int historyCount = Room.defaultHistoryCount,
114 : required Direction direction}) async {
115 4 : onUpdate?.call();
116 :
117 : try {
118 : // Look up for events in the database first. With fragmented view, we should delete the database cache
119 3 : final eventsFromStore = isFragmentedTimeline
120 : ? null
121 8 : : await room.client.database?.getEventList(
122 2 : room,
123 4 : start: events.length,
124 : limit: historyCount,
125 : );
126 :
127 2 : if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
128 : // Fetch all users from database we have got here.
129 0 : for (final event in events) {
130 0 : if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
131 : continue;
132 : }
133 : final dbUser =
134 0 : await room.client.database?.getUser(event.senderId, room);
135 0 : if (dbUser != null) room.setState(dbUser);
136 : }
137 :
138 0 : if (direction == Direction.b) {
139 0 : events.addAll(eventsFromStore);
140 0 : final startIndex = events.length - eventsFromStore.length;
141 0 : final endIndex = events.length;
142 0 : for (var i = startIndex; i < endIndex; i++) {
143 0 : onInsert?.call(i);
144 : }
145 : } else {
146 0 : events.insertAll(0, eventsFromStore);
147 0 : final startIndex = eventsFromStore.length;
148 : final endIndex = 0;
149 0 : for (var i = startIndex; i > endIndex; i--) {
150 0 : onInsert?.call(i);
151 : }
152 : }
153 : } else {
154 3 : _fetchedAllDatabaseEvents = true;
155 6 : Logs().i('No more events found in the store. Request from server...');
156 :
157 3 : if (isFragmentedTimeline) {
158 1 : await getRoomEvents(
159 : historyCount: historyCount,
160 : direction: direction,
161 : );
162 : } else {
163 4 : if (room.prev_batch == null) {
164 0 : Logs().i('No more events to request from server...');
165 : } else {
166 4 : await room.requestHistory(
167 : historyCount: historyCount,
168 : direction: direction,
169 2 : onHistoryReceived: () {
170 2 : _collectHistoryUpdates = true;
171 : },
172 : );
173 : }
174 : }
175 : }
176 : } finally {
177 3 : _collectHistoryUpdates = false;
178 3 : isRequestingHistory = false;
179 4 : onUpdate?.call();
180 : }
181 : }
182 :
183 : /// Request more previous events from the server. [historyCount] defines how much events should
184 : /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
185 : /// the historical events will be published in the onEvent stream.
186 : /// Returns the actual count of received timeline events.
187 1 : Future<int> getRoomEvents(
188 : {int historyCount = Room.defaultHistoryCount,
189 : direction = Direction.b}) async {
190 3 : final resp = await room.client.getRoomEvents(
191 2 : room.id,
192 : direction,
193 3 : from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
194 : limit: historyCount,
195 3 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
196 : );
197 :
198 1 : if (resp.end == null) {
199 2 : Logs().w('We reached the end of the timeline');
200 : }
201 :
202 2 : final newNextBatch = direction == Direction.b ? resp.start : resp.end;
203 2 : final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
204 :
205 1 : final type = direction == Direction.b
206 : ? EventUpdateType.history
207 : : EventUpdateType.timeline;
208 :
209 3 : if ((resp.state?.length ?? 0) == 0 &&
210 3 : resp.start != resp.end &&
211 : newPrevBatch != null &&
212 : newNextBatch != null) {
213 1 : if (type == EventUpdateType.history) {
214 0 : Logs().w(
215 0 : '[nav] we can still request history prevBatch: $type $newPrevBatch');
216 : } else {
217 2 : Logs().w(
218 1 : '[nav] we can still request timeline nextBatch: $type $newNextBatch');
219 : }
220 : }
221 :
222 : final newEvents =
223 6 : resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
224 :
225 1 : if (!allowNewEvent) {
226 3 : if (resp.start == resp.end ||
227 3 : (resp.end == null && direction == Direction.f)) allowNewEvent = true;
228 :
229 1 : if (allowNewEvent) {
230 2 : Logs().d('We now allow sync update into the timeline.');
231 1 : newEvents.addAll(
232 5 : await room.client.database?.getEventList(room, onlySending: true) ??
233 0 : []);
234 : }
235 : }
236 :
237 : // Try to decrypt encrypted events but don't update the database.
238 2 : if (room.encrypted && room.client.encryptionEnabled) {
239 0 : for (var i = 0; i < newEvents.length; i++) {
240 0 : if (newEvents[i].type == EventTypes.Encrypted) {
241 0 : newEvents[i] = await room.client.encryption!.decryptRoomEvent(
242 0 : room.id,
243 0 : newEvents[i],
244 : );
245 : }
246 : }
247 : }
248 :
249 : // update chunk anchors
250 1 : if (type == EventUpdateType.history) {
251 0 : chunk.prevBatch = newPrevBatch ?? '';
252 :
253 0 : final offset = chunk.events.length;
254 :
255 0 : chunk.events.addAll(newEvents);
256 :
257 0 : for (var i = 0; i < newEvents.length; i++) {
258 0 : onInsert?.call(i + offset);
259 : }
260 : } else {
261 2 : chunk.nextBatch = newNextBatch ?? '';
262 4 : chunk.events.insertAll(0, newEvents.reversed);
263 :
264 3 : for (var i = 0; i < newEvents.length; i++) {
265 2 : onInsert?.call(i);
266 : }
267 : }
268 :
269 1 : if (onUpdate != null) {
270 2 : onUpdate!();
271 : }
272 2 : return resp.chunk.length;
273 : }
274 :
275 8 : Timeline(
276 : {required this.room,
277 : this.onUpdate,
278 : this.onChange,
279 : this.onInsert,
280 : this.onRemove,
281 : this.onNewEvent,
282 : required this.chunk}) {
283 56 : sub = room.client.onEvent.stream.listen(_handleEventUpdate);
284 :
285 : // If the timeline is limited we want to clear our events cache
286 40 : roomSub = room.client.onSync.stream
287 55 : .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
288 16 : .listen(_removeEventsNotInThisSync);
289 :
290 8 : sessionIdReceivedSub =
291 40 : room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
292 8 : cancelSendEventSub =
293 48 : room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
294 :
295 : // we want to populate our aggregated events
296 14 : for (final e in events) {
297 6 : addAggregatedEvent(e);
298 : }
299 :
300 : // we are using a fragmented timeline
301 24 : if (chunk.nextBatch != '') {
302 1 : allowNewEvent = false;
303 1 : isFragmentedTimeline = true;
304 : // fragmented timelines never read from the database.
305 1 : _fetchedAllDatabaseEvents = true;
306 : }
307 : }
308 :
309 2 : void _cleanUpCancelledEvent(String eventId) {
310 2 : final i = _findEvent(event_id: eventId);
311 6 : if (i < events.length) {
312 6 : removeAggregatedEvent(events[i]);
313 4 : events.removeAt(i);
314 4 : onRemove?.call(i);
315 4 : onUpdate?.call();
316 : }
317 : }
318 :
319 : /// Removes all entries from [events] which are not in this SyncUpdate.
320 2 : void _removeEventsNotInThisSync(SyncUpdate sync) {
321 15 : final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
322 4 : final keepEventIds = newSyncEvents.map((e) => e.eventId);
323 7 : events.removeWhere((e) => !keepEventIds.contains(e.eventId));
324 : }
325 :
326 : /// Don't forget to call this before you dismiss this object!
327 0 : void cancelSubscriptions() {
328 : // ignore: discarded_futures
329 0 : sub?.cancel();
330 : // ignore: discarded_futures
331 0 : roomSub?.cancel();
332 : // ignore: discarded_futures
333 0 : sessionIdReceivedSub?.cancel();
334 : // ignore: discarded_futures
335 0 : cancelSendEventSub?.cancel();
336 : }
337 :
338 1 : void _sessionKeyReceived(String sessionId) async {
339 : var decryptAtLeastOneEvent = false;
340 1 : Future<void> decryptFn() async {
341 3 : final encryption = room.client.encryption;
342 3 : if (!room.client.encryptionEnabled || encryption == null) {
343 : return;
344 : }
345 4 : for (var i = 0; i < events.length; i++) {
346 4 : if (events[i].type == EventTypes.Encrypted &&
347 4 : events[i].messageType == MessageTypes.BadEncrypted &&
348 0 : events[i].content['session_id'] == sessionId) {
349 0 : events[i] = await encryption.decryptRoomEvent(
350 0 : room.id,
351 0 : events[i],
352 : store: true,
353 : updateType: EventUpdateType.history,
354 : );
355 0 : addAggregatedEvent(events[i]);
356 0 : onChange?.call(i);
357 0 : if (events[i].type != EventTypes.Encrypted) {
358 : decryptAtLeastOneEvent = true;
359 : }
360 : }
361 : }
362 : }
363 :
364 3 : if (room.client.database != null) {
365 4 : await room.client.database?.transaction(decryptFn);
366 : } else {
367 0 : await decryptFn();
368 : }
369 0 : if (decryptAtLeastOneEvent) onUpdate?.call();
370 : }
371 :
372 : /// Request the keys for undecryptable events of this timeline
373 0 : void requestKeys({
374 : bool tryOnlineBackup = true,
375 : bool onlineKeyBackupOnly = true,
376 : }) {
377 0 : for (final event in events) {
378 0 : if (event.type == EventTypes.Encrypted &&
379 0 : event.messageType == MessageTypes.BadEncrypted &&
380 0 : event.content['can_request_session'] == true) {
381 0 : final sessionId = event.content.tryGet<String>('session_id');
382 0 : final senderKey = event.content.tryGet<String>('sender_key');
383 : if (sessionId != null && senderKey != null) {
384 0 : room.client.encryption?.keyManager.maybeAutoRequest(
385 0 : room.id,
386 : sessionId,
387 : senderKey,
388 : tryOnlineBackup: tryOnlineBackup,
389 : onlineKeyBackupOnly: onlineKeyBackupOnly,
390 : );
391 : }
392 : }
393 : }
394 : }
395 :
396 : /// Set the read marker to the last synced event in this timeline.
397 2 : Future<void> setReadMarker({String? eventId, bool? public}) async {
398 : eventId ??=
399 12 : events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
400 : if (eventId == null) return;
401 4 : return room.setReadMarker(eventId, mRead: eventId, public: public);
402 : }
403 :
404 7 : int _findEvent({String? event_id, String? unsigned_txid}) {
405 : // we want to find any existing event where either the passed event_id or the passed unsigned_txid
406 : // matches either the event_id or transaction_id of the existing event.
407 : // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
408 : // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
409 : // thus meaning we found our element.
410 : final searchNeedle = <String>{};
411 : if (event_id != null) {
412 7 : searchNeedle.add(event_id);
413 : }
414 : if (unsigned_txid != null) {
415 4 : searchNeedle.add(unsigned_txid);
416 : }
417 : int i;
418 28 : for (i = 0; i < events.length; i++) {
419 21 : final searchHaystack = <String>{events[i].eventId};
420 :
421 28 : final txnid = events[i].unsigned?.tryGet<String>('transaction_id');
422 : if (txnid != null) {
423 4 : searchHaystack.add(txnid);
424 : }
425 14 : if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
426 : break;
427 : }
428 : }
429 : return i;
430 : }
431 :
432 3 : void _removeEventFromSet(Set<Event> eventSet, Event event) {
433 6 : eventSet.removeWhere((e) =>
434 6 : e.matchesEventOrTransactionId(event.eventId) ||
435 3 : event.unsigned != null &&
436 3 : e.matchesEventOrTransactionId(
437 6 : event.unsigned?.tryGet<String>('transaction_id')));
438 : }
439 :
440 8 : void addAggregatedEvent(Event event) {
441 : // we want to add an event to the aggregation tree
442 8 : final relationshipType = event.relationshipType;
443 8 : final relationshipEventId = event.relationshipEventId;
444 : if (relationshipType == null || relationshipEventId == null) {
445 : return; // nothing to do
446 : }
447 6 : final events = (aggregatedEvents[relationshipEventId] ??=
448 6 : <String, Set<Event>>{})[relationshipType] ??= <Event>{};
449 : // remove a potential old event
450 3 : _removeEventFromSet(events, event);
451 : // add the new one
452 3 : events.add(event);
453 3 : if (onChange != null) {
454 0 : final index = _findEvent(event_id: relationshipEventId);
455 0 : onChange?.call(index);
456 : }
457 : }
458 :
459 3 : void removeAggregatedEvent(Event event) {
460 9 : aggregatedEvents.remove(event.eventId);
461 3 : if (event.unsigned != null) {
462 12 : aggregatedEvents.remove(event.unsigned?['transaction_id']);
463 : }
464 7 : for (final types in aggregatedEvents.values) {
465 2 : for (final events in types.values) {
466 1 : _removeEventFromSet(events, event);
467 : }
468 : }
469 : }
470 :
471 7 : void _handleEventUpdate(EventUpdate eventUpdate, {bool update = true}) {
472 : try {
473 28 : if (eventUpdate.roomID != room.id) return;
474 :
475 14 : if (eventUpdate.type != EventUpdateType.timeline &&
476 12 : eventUpdate.type != EventUpdateType.history) {
477 : return;
478 : }
479 :
480 14 : if (eventUpdate.type == EventUpdateType.timeline) {
481 7 : onNewEvent?.call();
482 : }
483 :
484 7 : if (!allowNewEvent) return;
485 :
486 21 : final status = eventStatusFromInt(eventUpdate.content['status'] ??
487 15 : (eventUpdate.content['unsigned'] is Map<String, dynamic>
488 15 : ? eventUpdate.content['unsigned'][messageSendingStatusKey]
489 : : null) ??
490 4 : EventStatus.synced.intValue);
491 :
492 7 : final i = _findEvent(
493 14 : event_id: eventUpdate.content['event_id'],
494 21 : unsigned_txid: eventUpdate.content['unsigned'] is Map
495 21 : ? eventUpdate.content['unsigned']['transaction_id']
496 : : null);
497 :
498 21 : if (i < events.length) {
499 : // if the old status is larger than the new one, we also want to preserve the old status
500 21 : final oldStatus = events[i].status;
501 21 : events[i] = Event.fromJson(
502 7 : eventUpdate.content,
503 7 : room,
504 : );
505 : // do we preserve the status? we should allow 0 -> -1 updates and status increases
506 14 : if ((latestEventStatus(status, oldStatus) == oldStatus) &&
507 11 : !(status.isError && oldStatus.isSending)) {
508 21 : events[i].status = oldStatus;
509 : }
510 21 : addAggregatedEvent(events[i]);
511 9 : onChange?.call(i);
512 : } else {
513 6 : final newEvent = Event.fromJson(
514 6 : eventUpdate.content,
515 6 : room,
516 : );
517 :
518 12 : if (eventUpdate.type == EventUpdateType.history &&
519 6 : events.indexWhere(
520 18 : (e) => e.eventId == eventUpdate.content['event_id']) !=
521 3 : -1) return;
522 12 : var index = events.length;
523 12 : if (eventUpdate.type == EventUpdateType.history) {
524 6 : events.add(newEvent);
525 : } else {
526 8 : index = events.firstIndexWhereNotError;
527 8 : events.insert(index, newEvent);
528 : }
529 10 : onInsert?.call(index);
530 :
531 6 : addAggregatedEvent(newEvent);
532 : }
533 :
534 : // Handle redaction events
535 21 : if (eventUpdate.content['type'] == EventTypes.Redaction) {
536 : final index =
537 3 : _findEvent(event_id: eventUpdate.content.tryGet<String>('redacts'));
538 3 : if (index < events.length) {
539 3 : removeAggregatedEvent(events[index]);
540 :
541 : // Is the redacted event a reaction? Then update the event this
542 : // belongs to:
543 1 : if (onChange != null) {
544 3 : final relationshipEventId = events[index].relationshipEventId;
545 : if (relationshipEventId != null) {
546 0 : onChange?.call(_findEvent(event_id: relationshipEventId));
547 : return;
548 : }
549 : }
550 :
551 4 : events[index].setRedactionEvent(Event.fromJson(
552 1 : eventUpdate.content,
553 1 : room,
554 : ));
555 2 : onChange?.call(index);
556 : }
557 : }
558 :
559 7 : if (update && !_collectHistoryUpdates) {
560 9 : onUpdate?.call();
561 : }
562 : } catch (e, s) {
563 0 : Logs().w('Handle event update failed', e, s);
564 : }
565 : }
566 :
567 0 : @Deprecated('Use [startSearch] instead.')
568 : Stream<List<Event>> searchEvent({
569 : String? searchTerm,
570 : int requestHistoryCount = 100,
571 : int maxHistoryRequests = 10,
572 : String? sinceEventId,
573 : int? limit,
574 : bool Function(Event)? searchFunc,
575 : }) =>
576 0 : startSearch(
577 : searchTerm: searchTerm,
578 : requestHistoryCount: requestHistoryCount,
579 : maxHistoryRequests: maxHistoryRequests,
580 : // ignore: deprecated_member_use_from_same_package
581 : sinceEventId: sinceEventId,
582 : limit: limit,
583 : searchFunc: searchFunc,
584 0 : ).map((result) => result.$1);
585 :
586 : /// Searches [searchTerm] in this timeline. It first searches in the
587 : /// cache, then in the database and then on the server. The search can
588 : /// take a while, which is why this returns a stream so the already found
589 : /// events can already be displayed.
590 : /// Override the [searchFunc] if you need another search. This will then
591 : /// ignore [searchTerm].
592 : /// Returns the List of Events and the next prevBatch at the end of the
593 : /// search.
594 0 : Stream<(List<Event>, String?)> startSearch({
595 : String? searchTerm,
596 : int requestHistoryCount = 100,
597 : int maxHistoryRequests = 10,
598 : String? prevBatch,
599 : @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
600 : int? limit,
601 : bool Function(Event)? searchFunc,
602 : }) async* {
603 0 : assert(searchTerm != null || searchFunc != null);
604 0 : searchFunc ??= (event) =>
605 0 : event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
606 0 : final found = <Event>[];
607 :
608 : if (sinceEventId == null) {
609 : // Search locally
610 0 : for (final event in events) {
611 0 : if (searchFunc(event)) {
612 0 : yield (found..add(event), null);
613 : }
614 : }
615 :
616 : // Search in database
617 0 : var start = events.length;
618 : while (true) {
619 0 : final eventsFromStore = await room.client.database?.getEventList(
620 0 : room,
621 : start: start,
622 : limit: requestHistoryCount,
623 : ) ??
624 0 : [];
625 0 : if (eventsFromStore.isEmpty) break;
626 0 : start += eventsFromStore.length;
627 0 : for (final event in eventsFromStore) {
628 0 : if (searchFunc(event)) {
629 0 : yield (found..add(event), null);
630 : }
631 : }
632 : }
633 : }
634 :
635 : // Search on the server
636 0 : prevBatch ??= room.prev_batch;
637 : if (sinceEventId != null) {
638 : prevBatch =
639 0 : (await room.client.getEventContext(room.id, sinceEventId)).end;
640 : }
641 0 : final encryption = room.client.encryption;
642 0 : for (var i = 0; i < maxHistoryRequests; i++) {
643 : if (prevBatch == null) break;
644 0 : if (limit != null && found.length >= limit) break;
645 : try {
646 0 : final resp = await room.client.getRoomEvents(
647 0 : room.id,
648 : Direction.b,
649 : from: prevBatch,
650 : limit: requestHistoryCount,
651 0 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
652 : );
653 0 : for (final matrixEvent in resp.chunk) {
654 0 : var event = Event.fromMatrixEvent(matrixEvent, room);
655 0 : if (event.type == EventTypes.Encrypted && encryption != null) {
656 0 : event = await encryption.decryptRoomEvent(room.id, event);
657 0 : if (event.type == EventTypes.Encrypted &&
658 0 : event.messageType == MessageTypes.BadEncrypted &&
659 0 : event.content['can_request_session'] == true) {
660 : // Await requestKey() here to ensure decrypted message bodies
661 0 : await event.requestKey();
662 : }
663 : }
664 0 : if (searchFunc(event)) {
665 0 : yield (found..add(event), resp.end);
666 0 : if (limit != null && found.length >= limit) break;
667 : }
668 : }
669 0 : prevBatch = resp.end;
670 : // We are at the beginning of the room
671 0 : if (resp.chunk.length < requestHistoryCount) break;
672 0 : } on MatrixException catch (e) {
673 : // We have no permission anymore to request the history
674 0 : if (e.error == MatrixError.M_FORBIDDEN) {
675 : break;
676 : }
677 : rethrow;
678 : }
679 : }
680 : return;
681 : }
682 : }
683 :
684 : extension on List<Event> {
685 4 : int get firstIndexWhereNotError {
686 4 : if (isEmpty) return 0;
687 16 : final index = indexWhere((event) => !event.status.isError);
688 9 : if (index == -1) return length;
689 : return index;
690 : }
691 : }
|