javascript
examples
examples.js⚡javascript
/**
* Async Iterators & Streams - Examples
* Working with data that arrives over time
*/
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
// =============================================================================
// 1. BASIC ASYNC ITERATOR
// =============================================================================
console.log('--- Basic Async Iterator ---');
// Creating an async iterable object
const simpleAsyncIterable = {
data: [1, 2, 3, 4, 5],
async *[Symbol.asyncIterator]() {
for (const item of this.data) {
await delay(100); // Simulate async fetch
yield item;
}
},
};
// Using for await...of
async function consumeAsyncIterable() {
console.log('Starting async iteration...');
for await (const item of simpleAsyncIterable) {
console.log('Received:', item);
}
console.log('Async iteration complete');
}
consumeAsyncIterable();
// =============================================================================
// 2. ASYNC GENERATOR FUNCTIONS
// =============================================================================
console.log('\n--- Async Generator Functions ---');
async function* countAsync(start, end, delayMs = 100) {
for (let i = start; i <= end; i++) {
await delay(delayMs);
yield i;
}
}
// Using the async generator
async function useAsyncGenerator() {
console.log('Counting with async generator:');
for await (const num of countAsync(1, 5, 50)) {
console.log('Count:', num);
}
}
setTimeout(useAsyncGenerator, 1000);
// =============================================================================
// 3. PAGINATED API SIMULATION
// =============================================================================
console.log('\n--- Paginated API ---');
// Simulate a paginated API
const mockDatabase = {
items: Array.from({ length: 25 }, (_, i) => ({
id: i + 1,
name: `Item ${i + 1}`,
})),
async fetch(page, pageSize = 10) {
await delay(100); // Simulate network delay
const start = (page - 1) * pageSize;
const end = start + pageSize;
const items = this.items.slice(start, end);
return {
items,
page,
hasMore: end < this.items.length,
};
},
};
async function* fetchAllPages(pageSize = 10) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await mockDatabase.fetch(page, pageSize);
console.log(`Fetched page ${page} (${response.items.length} items)`);
for (const item of response.items) {
yield item;
}
hasMore = response.hasMore;
page++;
}
}
async function getAllItems() {
console.log('Fetching all items with async generator:');
const items = [];
for await (const item of fetchAllPages(10)) {
items.push(item);
}
console.log(`Total items fetched: ${items.length}`);
return items;
}
setTimeout(getAllItems, 2000);
// =============================================================================
// 4. TRANSFORMING ASYNC ITERATORS
// =============================================================================
console.log('\n--- Transforming Async Iterators ---');
// Map for async iterables
async function* asyncMap(iterable, fn) {
for await (const item of iterable) {
yield await fn(item);
}
}
// Filter for async iterables
async function* asyncFilter(iterable, predicate) {
for await (const item of iterable) {
if (await predicate(item)) {
yield item;
}
}
}
// Take first n items
async function* asyncTake(iterable, n) {
let count = 0;
for await (const item of iterable) {
if (count >= n) break;
yield item;
count++;
}
}
// Skip first n items
async function* asyncSkip(iterable, n) {
let count = 0;
for await (const item of iterable) {
if (count >= n) {
yield item;
}
count++;
}
}
// Demonstration
async function transformDemo() {
console.log('Transform demonstration:');
const numbers = countAsync(1, 10, 30);
const doubled = asyncMap(numbers, (n) => n * 2);
const filtered = asyncFilter(doubled, (n) => n > 10);
const taken = asyncTake(filtered, 3);
for await (const num of taken) {
console.log('Result:', num);
}
}
setTimeout(transformDemo, 4000);
// =============================================================================
// 5. ASYNC ITERATOR UTILITIES
// =============================================================================
console.log('\n--- Async Iterator Utilities ---');
// Collect all items into array
async function asyncToArray(iterable) {
const result = [];
for await (const item of iterable) {
result.push(item);
}
return result;
}
// Reduce async iterable
async function asyncReduce(iterable, reducer, initial) {
let accumulator = initial;
for await (const item of iterable) {
accumulator = await reducer(accumulator, item);
}
return accumulator;
}
// Find first matching item
async function asyncFind(iterable, predicate) {
for await (const item of iterable) {
if (await predicate(item)) {
return item;
}
}
return undefined;
}
// Check if any item matches
async function asyncSome(iterable, predicate) {
for await (const item of iterable) {
if (await predicate(item)) {
return true;
}
}
return false;
}
// Check if all items match
async function asyncEvery(iterable, predicate) {
for await (const item of iterable) {
if (!(await predicate(item))) {
return false;
}
}
return true;
}
// =============================================================================
// 6. MERGING ASYNC ITERATORS
// =============================================================================
console.log('\n--- Merging Async Iterators ---');
// Merge multiple async iterators (race style)
async function* asyncMerge(...iterables) {
const iterators = iterables.map((iterable) =>
iterable[Symbol.asyncIterator]()
);
const pending = new Map();
// Initialize with first next() from each iterator
iterators.forEach((iterator, index) => {
pending.set(
iterator.next().then((result) => ({ index, result })),
index
);
});
while (pending.size > 0) {
const { index, result } = await Promise.race(pending.keys());
// Remove the resolved promise
for (const [promise, i] of pending) {
if (i === index) {
pending.delete(promise);
break;
}
}
if (!result.done) {
yield result.value;
// Queue up next value from this iterator
pending.set(
iterators[index].next().then((result) => ({ index, result })),
index
);
}
}
}
// Zip multiple async iterators
async function* asyncZip(...iterables) {
const iterators = iterables.map((iterable) =>
iterable[Symbol.asyncIterator]()
);
while (true) {
const results = await Promise.all(
iterators.map((iterator) => iterator.next())
);
if (results.some((r) => r.done)) {
break;
}
yield results.map((r) => r.value);
}
}
// Demonstration
async function mergeDemo() {
console.log('Merge demonstration:');
const fast = countAsync(1, 3, 50);
const slow = countAsync(10, 12, 150);
console.log('Merged output (race):');
for await (const item of asyncMerge(fast, slow)) {
console.log(' Item:', item);
}
}
setTimeout(mergeDemo, 5000);
// =============================================================================
// 7. READABLE STREAM SIMULATION
// =============================================================================
console.log('\n--- Readable Stream Simulation ---');
class SimpleReadableStream {
constructor(source) {
this.source = source;
this.buffer = [];
this.closed = false;
this.onData = null;
this.onEnd = null;
}
async start() {
try {
for await (const chunk of this.source) {
if (this.onData) {
await this.onData(chunk);
}
this.buffer.push(chunk);
}
} finally {
this.closed = true;
if (this.onEnd) this.onEnd();
}
}
async *[Symbol.asyncIterator]() {
let index = 0;
while (true) {
if (index < this.buffer.length) {
yield this.buffer[index++];
} else if (this.closed) {
break;
} else {
await delay(10); // Wait for more data
}
}
}
pipe(destination) {
this.onData = async (chunk) => {
await destination.write(chunk);
};
this.onEnd = () => destination.end();
return destination;
}
}
class SimpleWritableStream {
constructor(options = {}) {
this.chunks = [];
this.onWrite = options.onWrite || (() => {});
this.onEnd = options.onEnd || (() => {});
}
async write(chunk) {
this.chunks.push(chunk);
await this.onWrite(chunk);
}
end() {
this.onEnd(this.chunks);
}
getResult() {
return this.chunks;
}
}
// =============================================================================
// 8. TRANSFORM STREAM SIMULATION
// =============================================================================
console.log('\n--- Transform Stream ---');
class SimpleTransformStream {
constructor(transformFn) {
this.transformFn = transformFn;
this.readable = null;
this.writable = new SimpleWritableStream({
onWrite: async (chunk) => {
const transformed = await this.transformFn(chunk);
if (this.output) {
await this.output.write(transformed);
}
},
onEnd: () => {
if (this.output) {
this.output.end();
}
},
});
this.output = null;
}
pipe(destination) {
this.output = destination;
return destination;
}
write(chunk) {
return this.writable.write(chunk);
}
end() {
this.writable.end();
}
}
// Pipeline demonstration
async function pipelineDemo() {
console.log('Pipeline demonstration:');
// Create source
async function* dataSource() {
yield { id: 1, value: 'hello' };
yield { id: 2, value: 'world' };
yield { id: 3, value: 'async' };
}
// Transform: uppercase values
const uppercaser = new SimpleTransformStream(async (chunk) => ({
...chunk,
value: chunk.value.toUpperCase(),
}));
// Destination
const output = new SimpleWritableStream({
onWrite: (chunk) => console.log(' Output:', chunk),
onEnd: (chunks) => console.log(' Total chunks:', chunks.length),
});
// Connect pipeline
uppercaser.pipe(output);
// Feed data
for await (const item of dataSource()) {
await uppercaser.write(item);
}
uppercaser.end();
}
setTimeout(pipelineDemo, 6000);
// =============================================================================
// 9. BACKPRESSURE HANDLING
// =============================================================================
console.log('\n--- Backpressure Handling ---');
class BackpressureStream {
constructor(options = {}) {
this.highWaterMark = options.highWaterMark || 5;
this.buffer = [];
this.waiting = null;
this.drainResolve = null;
}
async write(chunk) {
this.buffer.push(chunk);
if (this.waiting) {
const resolve = this.waiting;
this.waiting = null;
resolve(chunk);
}
// Apply backpressure if buffer is full
if (this.buffer.length >= this.highWaterMark) {
console.log(' Buffer full, applying backpressure');
await new Promise((resolve) => {
this.drainResolve = resolve;
});
}
}
async read() {
if (this.buffer.length > 0) {
const chunk = this.buffer.shift();
// Signal that we have space
if (this.drainResolve && this.buffer.length < this.highWaterMark) {
const resolve = this.drainResolve;
this.drainResolve = null;
resolve();
}
return chunk;
}
// Wait for data
return new Promise((resolve) => {
this.waiting = resolve;
});
}
async *[Symbol.asyncIterator]() {
while (true) {
const chunk = await this.read();
if (chunk === null) break;
yield chunk;
}
}
}
async function backpressureDemo() {
console.log('Backpressure demonstration:');
const stream = new BackpressureStream({ highWaterMark: 3 });
// Fast producer
(async () => {
for (let i = 1; i <= 10; i++) {
console.log(` Producing: ${i}`);
await stream.write(i);
}
stream.write(null); // Signal end
})();
// Slow consumer
for await (const item of stream) {
console.log(` Consumed: ${item}`);
await delay(100); // Slow consumer
}
}
setTimeout(backpressureDemo, 7000);
// =============================================================================
// 10. EVENT STREAM
// =============================================================================
console.log('\n--- Event Stream ---');
class EventStream {
constructor() {
this.listeners = [];
this.buffer = [];
this.closed = false;
}
emit(event) {
if (this.closed) return;
if (this.listeners.length > 0) {
const listener = this.listeners.shift();
listener.resolve({ value: event, done: false });
} else {
this.buffer.push(event);
}
}
close() {
this.closed = true;
// Resolve all waiting listeners
for (const listener of this.listeners) {
listener.resolve({ value: undefined, done: true });
}
this.listeners = [];
}
[Symbol.asyncIterator]() {
return {
next: () => {
if (this.buffer.length > 0) {
return Promise.resolve({
value: this.buffer.shift(),
done: false,
});
}
if (this.closed) {
return Promise.resolve({ value: undefined, done: true });
}
return new Promise((resolve) => {
this.listeners.push({ resolve });
});
},
};
}
}
async function eventStreamDemo() {
console.log('Event stream demonstration:');
const events = new EventStream();
// Emit events over time
setTimeout(() => events.emit({ type: 'click', x: 10, y: 20 }), 100);
setTimeout(() => events.emit({ type: 'click', x: 30, y: 40 }), 200);
setTimeout(() => events.emit({ type: 'move', x: 50, y: 60 }), 300);
setTimeout(() => events.close(), 400);
// Consume events
for await (const event of events) {
console.log(' Event:', event);
}
console.log(' Event stream closed');
}
setTimeout(eventStreamDemo, 8500);
console.log(
'\n=== Examples will execute over time. Watch the output above. ==='
);