
import {filter,  take, publishReplay } from 'rxjs/operators';
import { BehaviorSubject, Observable, from, ConnectableObservable } from 'rxjs';

// Queries data from the server (or in memory cache when specified) and allows consumer to subscribe to  data changes.
export abstract class QueryService<T> {
  // Dictionary mapping each key to its associated observable data stream. Behavior subject allows us to access the current value.
  // By convention we will initialise the stream with the value undefined.
  private changes: { [key: string]: BehaviorSubject<T> } = {};

  // Queries the data from the server or in memory cache.
  protected query(useCached: boolean, key: string): Observable<T> {
    // Return cached data if requested and the cached data exists or a request is in progress.
    if (this.changes[key] && useCached) {
      return this.changes[key].pipe(filter(data => data !== undefined)).pipe(take(1));
    }

    // Create an Observable stream of data for the given key.
    if (!this.changes[key]) {
      this.changes[key] = new BehaviorSubject<T>(undefined);
    }

    // Fetch the data from the server and convert to hot (multicast) observable so that each subscriber does not cause another HTTP request.
    // TODO - why do we have to create a new Observable for the publishReplay operator to work?
    const queryResult = from(this.getDataFromServer(key)).pipe(publishReplay(1)) as ConnectableObservable<T>;
    queryResult.connect();

    // Push new data to the stream of changes.
    queryResult.subscribe(data => {
      this.changes[key].next(data);
    });

    return queryResult;
  }

  // Hook for concrete classes to provide implementation of getting data from the server.
  protected abstract getDataFromServer(key: string): Observable<T>;

  // Gets an observable stream of changes to the cached data for the given key.
  protected cachedDataChanges(key: string): Observable<T> {
    // Check if there is already a BehaviorSubject for the given key.
    if (!this.changes[key]) {
      // If not, create a new BehaviorSubject initialized with undefined.
      this.changes[key] = new BehaviorSubject<T>(undefined);

      // Proactively fetch the data for the given key.
      // This ensures that the data is being loaded, which is especially useful if 
      // this function is the first to request data for the given key.
      this.query(false, key);
    }

      // Return the observable stream of data changes, filtering out undefined values.
      // Subscribers to this stream will only receive defined data values.
    return this.changes[key].pipe(filter(data => data !== undefined));
  }
}