Non-Local Control Flow

Setup

import os
BASE_URL = os.getenv("CS4973_BASE_URL")
API_KEY = os.getenv("CS4973_API_KEY")

import random
import asyncio
import aiohttp

from openai import OpenAI, AsyncOpenAI
from transformers import AutoTokenizer
import datasets

Generators

Introduction

A generator function is a special kind of function that suspends its execution when it produces a value for the caller. The caller may then resume the generator function to make it produce the nex value (if any). Ordinary functions do not work this way: they run to completion and cannot be suspended.

def make_three_gen():
    yield 1
    yield 2
    yield 3

When you apply a generator function, you get a generator object which keeps track of where the generator is suspended. You can use the built-in function next() to resume the generator, get the next value, and suspend immediately after the next value.

gen_obj = make_three_gen()
print(next(gen_obj))
print(next(gen_obj))
print(next(gen_obj))
# If we call next() again, the generator will raise a StopIteration exception.

1
2
3

This is not how generators are typically used. It is more typical in Python to use them with for loops, that automatically call next() for you.

for value in make_three_gen():
    print(value)
1
2
3

Unbounded Generators

Generators really do suspend execution, including suspending an infinite loop. Consider the following example of a generator that has an infinite loop.

def all_even_numbers():
    num = 0
    while True:
        yield num
        num += 2

We can get a finite prefix of even numbers.

even_gen_obj = all_even_numbers()
print(next(even_gen_obj))
print(next(even_gen_obj))
print(next(even_gen_obj))
# We could do this forever.
0
2
4

Using a for loop is more canonical Python. However, it is important to have the break statement to avoid running forever.

for n in all_even_numbers():
    print(n)
    if n == 4:
        break

0
2
4

Generators as Coroutines

It is possible to have several generators suspended at once, which means we can actually use multiple generators together.

def nums_from(n: int):
    while True:
        yield n
        n += 1


g0 = nums_from(10)
g1 = nums_from(100)

print(next(g0), next(g1))
print(next(g0), next(g1))
print(next(g0), next(g1))

10 100
11 101
12 102

Using next directly gives us fine-grained control. But, a more canonical way to write the code above is with a for loop and zip.

for n0, n1 in zip(nums_from(10), nums_from(100)):
    print(n0, n1)
    if n0 == 12:
        break

10 100
11 101
12 102

Here is an alternative implementation that uses zip and enumerate:

for i, (n0, n1) in enumerate(zip(nums_from(10), nums_from(100))):
    print(n0, n1)
    if i == 2:
        break
10 100
11 101
12 102

Generator Composition

The zip and enumerate functions are built-in Python functions. How could we write them ourselves? Try writing zip first and then try to get both of these pieces of code to work:

for i, n in enumerate_with_next(nums_from(10)):
    print(i, n)
    if i == 2:
        break
for i, n in enumerate_with_next(make_three_gen()):
    print(i, n)
    if i == 2:
        break

This can be done with next or with a for loop.

def enumerate_with_next(g):
    i = 0
    while True:
        yield i, next(g)
        i += 1

for i, n in enumerate_with_next(nums_from(10)):
    print(i, n)
    if i == 2:
        break

for i, n in enumerate_with_next(make_three_gen()):
    print(i, n)
    if i == 2:
        break


0 10
1 11
2 12
0 1
1 2
2 3

The code is simpler with for.

def enumerate_with_for(g):
    i = 0
    for x in g:
        yield i, x
        i += 1

for i, n in enumerate_with_for(nums_from(10)):
    print(i, n)
    if i == 2:
        break
0 10
1 11
2 12

Try writing zip with next and getting this code to work:

for n0, n1 in zip_with_next(nums_from(10), nums_from(100)):
    print(n0, n1)
    if n0 == 12:
        break
def zip_with_next(g0, g1):
    while True:
        yield next(g0), next(g1)

for n0, n1 in zip_with_next(nums_from(10), nums_from(100)):
    print(n0, n1)
    if n0 == 12:
        break
10 100
11 101
12 102

It is not possible to write zip with a for. A for loop iterates through a single generator at a time, but zip needs to iterate through two generators at once.

Chaining Generators

A common pattern with generators is to write a generator that produces values from one generator and then another.

def gen_a():
    yield 1
    yield 2
    yield 3

def gen_b():
    yield 10
    yield 20
    yield 30
    
def chain_with_next(g0, g1):
    try: 
        while True:
            yield next(g0)
    except StopIteration:
        pass

    try:
        while True:
            yield next(g1)
    except StopIteration:
        pass



for x in chain_with_next(gen_a(), gen_b()):
    print(x, end=" ")

1 2 3 10 20 30 

We can rewrite the code above to use for, which you should try. But, there is a simpler approach using yield from.

def chain_with_yield_from(g0, g1):
    yield from g0
    yield from g1

for x in chain_with_yield_from(gen_a(), gen_b()):
    print(x, end=" ")

1 2 3 10 20 30 

Back to LLMs

Here is a problem in the LLM world where generators are very helpful. It is less than 10 lines of code.

Suppose you have a large training corpus of text, say on the scale of a few TB. You cannot build an in-memory list of that size, so it is effectively unbounded. We want to tokenize this text to train an LLM. But, we also want to both split and pack tokens such that each training item is exactly N=2048 tokens long.

This is a little painful, because documents are of varying length

tokenizer = AutoTokenizer.from_pretrained("gpt2", padding_side="left", padding=True, clean_up_tokenization_spaces=False)
dataset = datasets.load_dataset("nuprl/engineering-llm-systems", name="wikipedia-northeastern-university", split="test")
dataset
Dataset({
    features: ['id', 'url', 'title', 'text'],
    num_rows: 2434
})

Here are three documents from the corpus. The first two are too long and need to be split. The third is too short. We can pad it, but we’ll make better use of memory by packing it into a training item with other documents.

len(tokenizer(dataset[0]["text"])["input_ids"]), len(tokenizer(dataset[1]["text"])["input_ids"]), len(tokenizer(dataset[500]["text"])["input_ids"])


Token indices sequence length is longer than the specified maximum sequence length for this model (17184 > 1024). Running this sequence through the model will result in indexing errors





(17184, 4948, 1133)
def generate_tokenized(tokenizer, dataset):
    for doc in dataset:
        input_ids = tokenizer(doc["text"])["input_ids"]
        yield input_ids

for i, input_ids in enumerate(generate_tokenized(tokenizer, dataset)):
    if i == 5:
        break
    print(len(input_ids), end=" ")
17184 4948 4009 10108 6305 

We can now write a generator that yields training items of length 2048 or smaller.

def splitter(max_length, token_generator):
    for input_ids in token_generator:
        for i in range(0, len(input_ids), max_length):
            yield input_ids[i:i+max_length]
        if i + max_length < len(input_ids):
            yield input_ids[i+max_length:]

for i, input_ids in enumerate(splitter(2048, generate_tokenized(tokenizer, dataset))):
    if i == 10:
        break
    print(len(input_ids), end=" ")
2048 2048 2048 2048 2048 2048 2048 2048 800 2048 

To handle packing, we need to both split and pack simultaneously.

def generate_constant_length_from_buffer(max_length, buffer):
    while len(buffer) >= max_length:
        yield buffer[:max_length]
        del buffer[:max_length] # Remove the first max_length tokens from the buffer

    # There may be a few tokens left.

def generate_constant_length(max_length, token_generator):
    buffer = [ ]
    for input_ids in token_generator:
        buffer.extend(input_ids)
        yield from generate_constant_length_from_buffer(max_length, buffer)
    if len(buffer) > 0:
        yield buffer
lens = [ ]
for i, input_ids in enumerate(generate_constant_length(2048, generate_tokenized(tokenizer, dataset))):
    lens.append(len(input_ids))

print(len(lens))
print(lens[-1])
2456
1030

Async Functions

Background

Python is single-threaded.

Python 3.13, released October 7 2024, has experimental support for true concurrency:

https://docs.python.org/3.13/whatsnew/3.13.html#free-threaded-cpython

The purpose of asynchronous function is to allow concurrent I/O.

MODEL = "meta-llama/Meta-Llama-3.1-8B-Instruct"

CLIENT = OpenAI(base_url=BASE_URL, api_key=API_KEY)

def send_query(message: str):
    response = CLIENT.chat.completions.create(
        model=MODEL,
        messages=[
            {"role": "user", "content": message},
        ],
    )
    return response.choices[0].message.content

We have been using synchronous requests.

QUERIES = [
    "What is the capital of France?",
    "What is the capital of Germany?",
    "What is the capital of Italy?",
]

for query in QUERIES:
    print(send_query(query))
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
To disable this warning, you can either:
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


The capital of France is Paris.
The capital of Germany is Berlin.
The capital of Italy is Rome.

But, we don’t actually care about the order in which these results are processed. So, the goal is to send several requests simultaneously.

Basics of Async

The code below creates two timers that run together. But, the second one finishes first.

  • The async keyword is used to define a function that runs asynchronously. It allows the function to be paused and resumed, making it suitable for tasks that involve waiting (e.g., I/O operations).

  • The await keyword is used inside an async function to pause its execution until the awaited operation completes. This allows other tasks to run during the waiting period, which improves efficiency for I/O-bound tasks.

async def timer(t, n):
    print(f"Timer {t} starts...")
    await asyncio.sleep(n)
    print(f"Timer {t} ends after {n} seconds")
    return t

t1 = asyncio.create_task(timer("t1", 2))
t2 = asyncio.create_task(timer("t2", 0.5))

The OpenAI Async Interface

The OpenAI API has an async interface that you can use to issue requests concurrently to the LLM server.

ASYNC_CLIENT = AsyncOpenAI(base_url=BASE_URL, api_key=API_KEY)

resp1 =  asyncio.create_task(ASYNC_CLIENT.chat.completions.create(
    model=MODEL, messages=[ {"role": "user", "content": "What is the capital of France?"} ]))
resp2 = asyncio.create_task(ASYNC_CLIENT.chat.completions.create(
    model=MODEL, messages=[ {"role": "user", "content": "What is the capital of Germany?"} ]))
resp3 = asyncio.create_task(ASYNC_CLIENT.chat.completions.create(
    model=MODEL, messages=[ {"role": "user", "content": "What is the capital of Italy?"} ]))

await asyncio.sleep(5) # Should be long enough, right?

Timer t1 starts...
Timer t2 starts...
Timer t2 ends after 0.5 seconds
Timer t1 ends after 2 seconds
print(resp1.result().choices[0].message.content)
print(resp2.result().choices[0].message.content)
print(resp3.result().choices[0].message.content)

The capital of France is Paris.
The capital of Germany is Berlin.
The capital of Italy is Rome.

Awaiting Several Results

An explicit sleep is a bad idea. We may not be sleeping long enough, or we may be sleeping needlessly long. It is usually a bad idea to use asyncio.create_task. Here is a better approach.

resp1 = ASYNC_CLIENT.chat.completions.create(
    model=MODEL, messages=[ {"role": "user", "content": "What is the capital of France?"} ])
resp2 = ASYNC_CLIENT.chat.completions.create(
    model=MODEL, messages=[ {"role": "user", "content": "What is the capital of Germany?"} ])
resp3 = ASYNC_CLIENT.chat.completions.create(
    model=MODEL, messages=[ {"role": "user", "content": "What is the capital of Italy?"} ])

results = await asyncio.gather(resp1, resp2, resp3) 

for result in results:
    print(result.choices[0].message.content)

The capital of France is Paris.
The capital of Germany is Berlin.
The capital of Italy is Rome.

A more concise approach:

async def send_queries(texts):
    requests = [ ASYNC_CLIENT.chat.completions.create(
        model=MODEL, messages=[ {"role": "user", "content": text} ]) for text in texts ]
    responses = await asyncio.gather(*requests)
    return [ response.choices[0].message.content for response in responses ]

results = await send_queries([
    "What is the capital of France?",
    "What is the capital of Germany?",
    "What is the capital of Italy?",
])

for result in results:
    print(result)

The capital of France is Paris.
The capital of Germany is Berlin.
The capital of Italy is Rome.

More Advanced Usage

Given a list of tasks, you can wait for the first one to complete, instead of waiting for all of them to complete.

We will get different capitals each time we run the cell below.

async def first_query(texts):
    requests = [ asyncio.create_task(ASYNC_CLIENT.chat.completions.create(
        model=MODEL, messages=[ {"role": "user", "content": text} ])) for text in texts ]
    done, pending = await asyncio.wait(requests, return_when=asyncio.FIRST_COMPLETED)
    return done.pop().result().choices[0].message.content

result = await first_query([
    "What is the capital of France?",
    "What is the capital of Germany?",
    "What is the capital of Italy?",
])

print(result)
The capital of Italy is Rome.

We can also wait with a timeout.

async def query_with_timeout(text):
    try:
        return await asyncio.wait_for(
            ASYNC_CLIENT.chat.completions.create(
                model=MODEL, messages=[{"role": "user", "content": text}]
            ),
            timeout=3.0
        )
    except asyncio.TimeoutError:
        return None

async def send_queries(texts):
    responses = await asyncio.gather(*[query_with_timeout(text) for text in texts])
    return [response.choices[0].message.content if response else "Timeout" for response in responses]

results = await send_queries([
    "What is the capital of France?",
    "What is the capital of Germany?",
    "What is the capital of Italy?",
    "Write a short story.",
])

for result in results:
    print(result)

The capital of France is Paris.
The capital of Germany is Berlin.
The capital of Italy is Rome.
Timeout