Skip to main content

Implementing an ORS Client

Learn how to build clients that interact with ORS servers for running episodes, training agents, or evaluation.

Two Approaches

Option 1: Use OpenReward Python SDK

Simplest approach - handles all protocol details:
from openreward import OpenReward

client = OpenReward()
env = client.environments.get(
    name="myenv",
    base_url="http://localhost:8080"
)

# Run episode
with env.session(task=task) as session:
    prompt = session.get_prompt()
    result = session.call_tool("submit", {"answer": "42"})

Option 2: Custom HTTP Client

For other languages - implement HTTP protocol:

Python HTTP Client Example

import httpx
import json

class ORSClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.Client(timeout=60.0)

    def list_tools(self, env_name: str):
        """Get available tools"""
        response = self.client.get(f"{self.base_url}/{env_name}/tools")
        return response.json()["tools"]

    def list_tasks(self, env_name: str, split: str):
        """Get tasks for split"""
        response = self.client.post(
            f"{self.base_url}/{env_name}/tasks",
            json={"split": split}
        )
        return response.json()["tasks"]

    def create_session(self):
        """Generate session ID"""
        response = self.client.post(f"{self.base_url}/create_session")
        return response.json()["sid"]

    def create_episode(self, session_id: str, env_name: str, task_spec: dict):
        """Create episode instance"""
        response = self.client.post(
            f"{self.base_url}/create",
            headers={"X-Session-ID": session_id},
            json={
                "env_name": env_name,
                "task_spec": task_spec,
                "secrets": {}
            }
        )
        return response.json()

    def get_prompt(self, session_id: str, env_name: str):
        """Get initial prompt"""
        response = self.client.get(
            f"{self.base_url}/{env_name}/prompt",
            headers={"X-Session-ID": session_id}
        )
        return response.json()

    def call_tool(self, session_id: str, env_name: str, tool_name: str, tool_input: dict):
        """Call tool (handles SSE)"""
        with self.client.stream(
            "POST",
            f"{self.base_url}/{env_name}/call",
            headers={
                "X-Session-ID": session_id,
                "Accept": "text/event-stream"
            },
            json={"name": tool_name, "input": tool_input}
        ) as response:
            buffer = ""

            for line in response.iter_lines():
                if line.startswith("event: "):
                    event_type = line[7:]
                elif line.startswith("data: "):
                    data = line[6:]

                    if event_type == "end":
                        complete_data = buffer + data
                        result = json.loads(complete_data)

                        if result["ok"]:
                            return result["output"]
                        else:
                            raise Exception(result["error"])

                    elif event_type == "chunk":
                        buffer += data

                    elif event_type == "error":
                        raise Exception(data)

    def delete_episode(self, session_id: str):
        """Clean up episode"""
        response = self.client.post(
            f"{self.base_url}/delete",
            headers={"X-Session-ID": session_id}
        )
        return response.json()

# Usage
client = ORSClient("http://localhost:8080")

# List available tools
tools = client.list_tools("math")
print(f"Available tools: {[t['name'] for t in tools]}")

# Get tasks
tasks = client.list_tasks("math", "train")

# Run episode
task = tasks[0]
session_id = client.create_session()

try:
    client.create_episode(session_id, "math", task)

    prompt = client.get_prompt(session_id, "math")
    print(f"Prompt: {prompt[0]['text']}")

    result = client.call_tool(session_id, "math", "submit", {"answer": "4"})
    print(f"Reward: {result['reward']}")
    print(f"Finished: {result['finished']}")

finally:
    client.delete_episode(session_id)

JavaScript/TypeScript Client

class ORSClient {
  constructor(private baseUrl: string) {}

  async listTools(envName: string) {
    const response = await fetch(`${this.baseUrl}/${envName}/tools`);
    const data = await response.json();
    return data.tools;
  }

  async createSession(): Promise<string> {
    const response = await fetch(`${this.baseUrl}/create_session`, {
      method: 'POST'
    });
    const data = await response.json();
    return data.sid;
  }

  async callTool(
    sessionId: string,
    envName: string,
    toolName: string,
    toolInput: any
  ) {
    const response = await fetch(`${this.baseUrl}/${envName}/call`, {
      method: 'POST',
      headers: {
        'X-Session-ID': sessionId,
        'Accept': 'text/event-stream',
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ name: toolName, input: toolInput })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';
    let eventType = '';

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const lines = chunk.split('\\n');

      for (const line of lines) {
        if (line.startsWith('event: ')) {
          eventType = line.slice(7);
        } else if (line.startsWith('data: ')) {
          const data = line.slice(6);

          if (eventType === 'end') {
            const result = JSON.parse(buffer + data);
            if (result.ok) {
              return result.output;
            } else {
              throw new Error(result.error);
            }
          } else if (eventType === 'chunk') {
            buffer += data;
          } else if (eventType === 'error') {
            throw new Error(data);
          }
        }
      }
    }
  }
}

// Usage
const client = new ORSClient('http://localhost:8080');

const sessionId = await client.createSession();
// ... create episode, call tools

Episode Runner

Build a reusable episode runner:
class EpisodeRunner:
    def __init__(self, client: ORSClient, env_name: str):
        self.client = client
        self.env_name = env_name

    def run_episode(self, task: dict) -> dict:
        """Run one complete episode"""
        session_id = self.client.create_session()

        try:
            # Create episode
            self.client.create_episode(session_id, self.env_name, task)

            # Get initial prompt
            prompt = self.client.get_prompt(session_id, self.env_name)

            # Agent loop
            total_reward = 0.0
            steps = 0
            finished = False

            while not finished and steps < 100:  # Safety limit
                # Agent selects action (placeholder)
                action = self.select_action(prompt)

                # Execute action
                result = self.client.call_tool(
                    session_id,
                    self.env_name,
                    action["tool"],
                    action["input"]
                )

                # Update state
                prompt = result["blocks"]
                total_reward += result.get("reward", 0.0)
                finished = result["finished"]
                steps += 1

            return {
                "success": finished and total_reward > 0,
                "total_reward": total_reward,
                "steps": steps,
                "task": task
            }

        finally:
            self.client.delete_episode(session_id)

    def select_action(self, observation):
        """Override this with your agent logic"""
        raise NotImplementedError

Batch Evaluation

Run multiple episodes efficiently:
def evaluate(client: ORSClient, env_name: str, tasks: list) -> dict:
    """Evaluate agent on multiple tasks"""
    results = []

    for task in tasks:
        result = run_episode(client, env_name, task)
        results.append(result)

        print(f"Task {task['id']}: "
              f"{'PASS' if result['success'] else 'FAIL'} "
              f"(reward={result['total_reward']:.2f}, "
              f"steps={result['steps']})")

    # Compute metrics
    success_rate = sum(r["success"] for r in results) / len(results)
    avg_reward = sum(r["total_reward"] for r in results) / len(results)
    avg_steps = sum(r["steps"] for r in results) / len(results)

    return {
        "success_rate": success_rate,
        "avg_reward": avg_reward,
        "avg_steps": avg_steps,
        "results": results
    }

# Usage
client = ORSClient("http://localhost:8080")
tasks = client.list_tasks("math", "test")

metrics = evaluate(client, "math", tasks)
print(f"Success rate: {metrics['success_rate']:.1%}")
print(f"Average reward: {metrics['avg_reward']:.3f}")

Error Handling

def safe_episode_run(client, env_name, task):
    session_id = None

    try:
        session_id = client.create_session()
        client.create_episode(session_id, env_name, task)

        # ... episode logic

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            print("Session or environment not found")
        elif e.response.status_code == 400:
            print("Bad request - check inputs")
        else:
            print(f"HTTP error: {e}")

    except httpx.TimeoutException:
        print("Request timed out")

    except Exception as e:
        print(f"Error: {e}")

    finally:
        if session_id:
            try:
                client.delete_episode(session_id)
            except:
                pass  # Cleanup already done or session expired

Parallel Execution

Run multiple episodes concurrently:
import asyncio

async def run_episode_async(client, env_name, task):
    """Async episode runner"""
    # Similar to sync version but with async httpx
    pass

async def parallel_evaluate(client, env_name, tasks, max_concurrent=5):
    """Run episodes in parallel"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def run_with_limit(task):
        async with semaphore:
            return await run_episode_async(client, env_name, task)

    results = await asyncio.gather(*[
        run_with_limit(task) for task in tasks
    ])

    return results

# Usage
results = asyncio.run(parallel_evaluate(client, "math", tasks))

Next Steps


Key Takeaway: Building an ORS client is straightforward HTTP programming. Handle SSE for tool calls, manage session IDs, and implement proper error handling. Use the Python SDK for quick development or implement custom clients in any language.