javascript

examples

examples.js
/**
 * Async Iterators & Streams - Examples
 * Working with data that arrives over time
 */

const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

// =============================================================================
// 1. BASIC ASYNC ITERATOR
// =============================================================================

console.log('--- Basic Async Iterator ---');

// Creating an async iterable object
const simpleAsyncIterable = {
  data: [1, 2, 3, 4, 5],

  async *[Symbol.asyncIterator]() {
    for (const item of this.data) {
      await delay(100); // Simulate async fetch
      yield item;
    }
  },
};

// Using for await...of
async function consumeAsyncIterable() {
  console.log('Starting async iteration...');
  for await (const item of simpleAsyncIterable) {
    console.log('Received:', item);
  }
  console.log('Async iteration complete');
}

consumeAsyncIterable();

// =============================================================================
// 2. ASYNC GENERATOR FUNCTIONS
// =============================================================================

console.log('\n--- Async Generator Functions ---');

async function* countAsync(start, end, delayMs = 100) {
  for (let i = start; i <= end; i++) {
    await delay(delayMs);
    yield i;
  }
}

// Using the async generator
async function useAsyncGenerator() {
  console.log('Counting with async generator:');
  for await (const num of countAsync(1, 5, 50)) {
    console.log('Count:', num);
  }
}

setTimeout(useAsyncGenerator, 1000);

// =============================================================================
// 3. PAGINATED API SIMULATION
// =============================================================================

console.log('\n--- Paginated API ---');

// Simulate a paginated API
const mockDatabase = {
  items: Array.from({ length: 25 }, (_, i) => ({
    id: i + 1,
    name: `Item ${i + 1}`,
  })),

  async fetch(page, pageSize = 10) {
    await delay(100); // Simulate network delay

    const start = (page - 1) * pageSize;
    const end = start + pageSize;
    const items = this.items.slice(start, end);

    return {
      items,
      page,
      hasMore: end < this.items.length,
    };
  },
};

async function* fetchAllPages(pageSize = 10) {
  let page = 1;
  let hasMore = true;

  while (hasMore) {
    const response = await mockDatabase.fetch(page, pageSize);
    console.log(`Fetched page ${page} (${response.items.length} items)`);

    for (const item of response.items) {
      yield item;
    }

    hasMore = response.hasMore;
    page++;
  }
}

async function getAllItems() {
  console.log('Fetching all items with async generator:');
  const items = [];

  for await (const item of fetchAllPages(10)) {
    items.push(item);
  }

  console.log(`Total items fetched: ${items.length}`);
  return items;
}

setTimeout(getAllItems, 2000);

// =============================================================================
// 4. TRANSFORMING ASYNC ITERATORS
// =============================================================================

console.log('\n--- Transforming Async Iterators ---');

// Map for async iterables
async function* asyncMap(iterable, fn) {
  for await (const item of iterable) {
    yield await fn(item);
  }
}

// Filter for async iterables
async function* asyncFilter(iterable, predicate) {
  for await (const item of iterable) {
    if (await predicate(item)) {
      yield item;
    }
  }
}

// Take first n items
async function* asyncTake(iterable, n) {
  let count = 0;
  for await (const item of iterable) {
    if (count >= n) break;
    yield item;
    count++;
  }
}

// Skip first n items
async function* asyncSkip(iterable, n) {
  let count = 0;
  for await (const item of iterable) {
    if (count >= n) {
      yield item;
    }
    count++;
  }
}

// Demonstration
async function transformDemo() {
  console.log('Transform demonstration:');

  const numbers = countAsync(1, 10, 30);
  const doubled = asyncMap(numbers, (n) => n * 2);
  const filtered = asyncFilter(doubled, (n) => n > 10);
  const taken = asyncTake(filtered, 3);

  for await (const num of taken) {
    console.log('Result:', num);
  }
}

setTimeout(transformDemo, 4000);

// =============================================================================
// 5. ASYNC ITERATOR UTILITIES
// =============================================================================

console.log('\n--- Async Iterator Utilities ---');

// Collect all items into array
async function asyncToArray(iterable) {
  const result = [];
  for await (const item of iterable) {
    result.push(item);
  }
  return result;
}

// Reduce async iterable
async function asyncReduce(iterable, reducer, initial) {
  let accumulator = initial;
  for await (const item of iterable) {
    accumulator = await reducer(accumulator, item);
  }
  return accumulator;
}

// Find first matching item
async function asyncFind(iterable, predicate) {
  for await (const item of iterable) {
    if (await predicate(item)) {
      return item;
    }
  }
  return undefined;
}

// Check if any item matches
async function asyncSome(iterable, predicate) {
  for await (const item of iterable) {
    if (await predicate(item)) {
      return true;
    }
  }
  return false;
}

// Check if all items match
async function asyncEvery(iterable, predicate) {
  for await (const item of iterable) {
    if (!(await predicate(item))) {
      return false;
    }
  }
  return true;
}

// =============================================================================
// 6. MERGING ASYNC ITERATORS
// =============================================================================

console.log('\n--- Merging Async Iterators ---');

// Merge multiple async iterators (race style)
async function* asyncMerge(...iterables) {
  const iterators = iterables.map((iterable) =>
    iterable[Symbol.asyncIterator]()
  );

  const pending = new Map();

  // Initialize with first next() from each iterator
  iterators.forEach((iterator, index) => {
    pending.set(
      iterator.next().then((result) => ({ index, result })),
      index
    );
  });

  while (pending.size > 0) {
    const { index, result } = await Promise.race(pending.keys());

    // Remove the resolved promise
    for (const [promise, i] of pending) {
      if (i === index) {
        pending.delete(promise);
        break;
      }
    }

    if (!result.done) {
      yield result.value;

      // Queue up next value from this iterator
      pending.set(
        iterators[index].next().then((result) => ({ index, result })),
        index
      );
    }
  }
}

// Zip multiple async iterators
async function* asyncZip(...iterables) {
  const iterators = iterables.map((iterable) =>
    iterable[Symbol.asyncIterator]()
  );

  while (true) {
    const results = await Promise.all(
      iterators.map((iterator) => iterator.next())
    );

    if (results.some((r) => r.done)) {
      break;
    }

    yield results.map((r) => r.value);
  }
}

// Demonstration
async function mergeDemo() {
  console.log('Merge demonstration:');

  const fast = countAsync(1, 3, 50);
  const slow = countAsync(10, 12, 150);

  console.log('Merged output (race):');
  for await (const item of asyncMerge(fast, slow)) {
    console.log('  Item:', item);
  }
}

setTimeout(mergeDemo, 5000);

// =============================================================================
// 7. READABLE STREAM SIMULATION
// =============================================================================

console.log('\n--- Readable Stream Simulation ---');

class SimpleReadableStream {
  constructor(source) {
    this.source = source;
    this.buffer = [];
    this.closed = false;
    this.onData = null;
    this.onEnd = null;
  }

  async start() {
    try {
      for await (const chunk of this.source) {
        if (this.onData) {
          await this.onData(chunk);
        }
        this.buffer.push(chunk);
      }
    } finally {
      this.closed = true;
      if (this.onEnd) this.onEnd();
    }
  }

  async *[Symbol.asyncIterator]() {
    let index = 0;

    while (true) {
      if (index < this.buffer.length) {
        yield this.buffer[index++];
      } else if (this.closed) {
        break;
      } else {
        await delay(10); // Wait for more data
      }
    }
  }

  pipe(destination) {
    this.onData = async (chunk) => {
      await destination.write(chunk);
    };
    this.onEnd = () => destination.end();
    return destination;
  }
}

class SimpleWritableStream {
  constructor(options = {}) {
    this.chunks = [];
    this.onWrite = options.onWrite || (() => {});
    this.onEnd = options.onEnd || (() => {});
  }

  async write(chunk) {
    this.chunks.push(chunk);
    await this.onWrite(chunk);
  }

  end() {
    this.onEnd(this.chunks);
  }

  getResult() {
    return this.chunks;
  }
}

// =============================================================================
// 8. TRANSFORM STREAM SIMULATION
// =============================================================================

console.log('\n--- Transform Stream ---');

class SimpleTransformStream {
  constructor(transformFn) {
    this.transformFn = transformFn;
    this.readable = null;
    this.writable = new SimpleWritableStream({
      onWrite: async (chunk) => {
        const transformed = await this.transformFn(chunk);
        if (this.output) {
          await this.output.write(transformed);
        }
      },
      onEnd: () => {
        if (this.output) {
          this.output.end();
        }
      },
    });
    this.output = null;
  }

  pipe(destination) {
    this.output = destination;
    return destination;
  }

  write(chunk) {
    return this.writable.write(chunk);
  }

  end() {
    this.writable.end();
  }
}

// Pipeline demonstration
async function pipelineDemo() {
  console.log('Pipeline demonstration:');

  // Create source
  async function* dataSource() {
    yield { id: 1, value: 'hello' };
    yield { id: 2, value: 'world' };
    yield { id: 3, value: 'async' };
  }

  // Transform: uppercase values
  const uppercaser = new SimpleTransformStream(async (chunk) => ({
    ...chunk,
    value: chunk.value.toUpperCase(),
  }));

  // Destination
  const output = new SimpleWritableStream({
    onWrite: (chunk) => console.log('  Output:', chunk),
    onEnd: (chunks) => console.log('  Total chunks:', chunks.length),
  });

  // Connect pipeline
  uppercaser.pipe(output);

  // Feed data
  for await (const item of dataSource()) {
    await uppercaser.write(item);
  }
  uppercaser.end();
}

setTimeout(pipelineDemo, 6000);

// =============================================================================
// 9. BACKPRESSURE HANDLING
// =============================================================================

console.log('\n--- Backpressure Handling ---');

class BackpressureStream {
  constructor(options = {}) {
    this.highWaterMark = options.highWaterMark || 5;
    this.buffer = [];
    this.waiting = null;
    this.drainResolve = null;
  }

  async write(chunk) {
    this.buffer.push(chunk);

    if (this.waiting) {
      const resolve = this.waiting;
      this.waiting = null;
      resolve(chunk);
    }

    // Apply backpressure if buffer is full
    if (this.buffer.length >= this.highWaterMark) {
      console.log('  Buffer full, applying backpressure');
      await new Promise((resolve) => {
        this.drainResolve = resolve;
      });
    }
  }

  async read() {
    if (this.buffer.length > 0) {
      const chunk = this.buffer.shift();

      // Signal that we have space
      if (this.drainResolve && this.buffer.length < this.highWaterMark) {
        const resolve = this.drainResolve;
        this.drainResolve = null;
        resolve();
      }

      return chunk;
    }

    // Wait for data
    return new Promise((resolve) => {
      this.waiting = resolve;
    });
  }

  async *[Symbol.asyncIterator]() {
    while (true) {
      const chunk = await this.read();
      if (chunk === null) break;
      yield chunk;
    }
  }
}

async function backpressureDemo() {
  console.log('Backpressure demonstration:');

  const stream = new BackpressureStream({ highWaterMark: 3 });

  // Fast producer
  (async () => {
    for (let i = 1; i <= 10; i++) {
      console.log(`  Producing: ${i}`);
      await stream.write(i);
    }
    stream.write(null); // Signal end
  })();

  // Slow consumer
  for await (const item of stream) {
    console.log(`  Consumed: ${item}`);
    await delay(100); // Slow consumer
  }
}

setTimeout(backpressureDemo, 7000);

// =============================================================================
// 10. EVENT STREAM
// =============================================================================

console.log('\n--- Event Stream ---');

class EventStream {
  constructor() {
    this.listeners = [];
    this.buffer = [];
    this.closed = false;
  }

  emit(event) {
    if (this.closed) return;

    if (this.listeners.length > 0) {
      const listener = this.listeners.shift();
      listener.resolve({ value: event, done: false });
    } else {
      this.buffer.push(event);
    }
  }

  close() {
    this.closed = true;
    // Resolve all waiting listeners
    for (const listener of this.listeners) {
      listener.resolve({ value: undefined, done: true });
    }
    this.listeners = [];
  }

  [Symbol.asyncIterator]() {
    return {
      next: () => {
        if (this.buffer.length > 0) {
          return Promise.resolve({
            value: this.buffer.shift(),
            done: false,
          });
        }

        if (this.closed) {
          return Promise.resolve({ value: undefined, done: true });
        }

        return new Promise((resolve) => {
          this.listeners.push({ resolve });
        });
      },
    };
  }
}

async function eventStreamDemo() {
  console.log('Event stream demonstration:');

  const events = new EventStream();

  // Emit events over time
  setTimeout(() => events.emit({ type: 'click', x: 10, y: 20 }), 100);
  setTimeout(() => events.emit({ type: 'click', x: 30, y: 40 }), 200);
  setTimeout(() => events.emit({ type: 'move', x: 50, y: 60 }), 300);
  setTimeout(() => events.close(), 400);

  // Consume events
  for await (const event of events) {
    console.log('  Event:', event);
  }
  console.log('  Event stream closed');
}

setTimeout(eventStreamDemo, 8500);

console.log(
  '\n=== Examples will execute over time. Watch the output above. ==='
);
Examples - JavaScript Tutorial | DeepML