Skip to main content

Implementing an ORS Server

This guide shows how to implement an ORS server, either using the Python SDK or from scratch in any language.

Two Approaches

Option 1: Use OpenReward Python SDK

Pros: Fast, handles protocol details, well-tested Cons: Python only

Option 2: Implement HTTP Protocol

Pros: Any language, full control, no dependencies Cons: More work, must handle all protocol details

Option 1: Python SDK Implementation

Basic Structure

from openreward.environments import Environment, Server, tool
from openreward.environments.types import ToolOutput, TextBlock
from pydantic import BaseModel

class MyEnvironment(Environment):
    """Your environment description"""

    @classmethod
    def list_splits(cls):
        """Return available splits"""
        return ["train", "test"]

    @classmethod
    def list_tasks(cls, split: str):
        """Return tasks for split"""
        if split == "train":
            return [/* training tasks */]
        else:
            return [/* test tasks */]

    def get_prompt(self):
        """Generate prompt from task"""
        return [TextBlock(text=f"Task: {self.task_spec['description']}")]

    @tool
    def my_tool(self, params: MyToolParams) -> ToolOutput:
        """Tool implementation"""
        # Your logic here
        return ToolOutput(
            blocks=[TextBlock(text="Result")],
            reward=1.0,
            finished=True
        )

# Parameter schema
class MyToolParams(BaseModel):
    param1: str
    param2: int

# Run server
if __name__ == "__main__":
    server = Server([MyEnvironment])
    server.run(port=8080)

Key Methods

list_splits(cls) - Required:
  • Class method (no instance needed)
  • Returns list of split names or Split objects
  • Example: return ["train", "validation", "test"]
list_tasks(cls, split) - Required:
  • Class method
  • Returns list of task objects (JSON)
  • Task structure is environment-specific
get_prompt(self) - Required:
  • Instance method (has access to self.task_spec)
  • Returns Blocks (list of TextBlock/ImageBlock)
  • Called once per episode
setup(self) - Optional:
  • Called when episode starts
  • Initialize environment state
  • Can be async
teardown(self) - Optional:
  • Called when episode ends
  • Cleanup resources
  • Can be async

Tool Decorator

@tool
def submit(self, params: SubmitParams) -> ToolOutput:
    """Tool docstring becomes description"""
    # Tool logic
    return ToolOutput(...)
Requirements:
  • Decorated with @tool
  • Takes self and one Pydantic model parameter
  • Returns ToolOutput
  • Can be async

Complete Example

See Quick Start for a working GSM8K environment.

Option 2: Custom HTTP Implementation

Required Endpoints

Implement these HTTP endpoints: Discovery (no auth):
  • GET /health
  • GET /list_environments
  • GET /{env_name}/tools
  • GET /{env_name}/splits
  • POST /{env_name}/tasks
Session Management (requires X-Session-ID):
  • POST /create_session
  • POST /create
  • POST /delete
  • POST /ping
Episode Interaction (requires X-Session-ID):
  • GET /{env_name}/prompt
  • POST /{env_name}/call (returns SSE stream)
See HTTP API Reference for complete specs.

Example: Node.js/TypeScript

import express from 'express';

const app = express();
app.use(express.json());

// In-memory session store
const sessions = new Map<string, EnvironmentInstance>();

// Health check
app.get('/health', (req, res) => {
  res.json({ status: 'ok' });
});

// List environments
app.get('/list_environments', (req, res) => {
  res.json(['myenvironment']);
});

// List tools
app.get('/:envName/tools', (req, res) => {
  res.json({
    tools: [
      {
        name: 'submit',
        description: 'Submit answer',
        input_schema: {
          type: 'object',
          properties: {
            answer: { type: 'string' }
          },
          required: ['answer']
        }
      }
    ]
  });
});

// Create session
app.post('/create_session', (req, res) => {
  const sid = crypto.randomUUID();
  res.json({ sid });
});

// Create episode
app.post('/create', (req, res) => {
  const sid = req.headers['x-session-id'] as string;
  const { env_name, task_spec, secrets } = req.body;

  const env = new EnvironmentInstance(task_spec);
  sessions.set(sid, env);

  res.json({ sid });
});

// Call tool (SSE)
app.post('/:envName/call', async (req, res) => {
  const sid = req.headers['x-session-id'] as string;
  const { name, input } = req.body;

  const env = sessions.get(sid);
  if (!env) {
    return res.status(404).json({ error: 'Session not found' });
  }

  // Set SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  // Send task ID
  const taskId = crypto.randomUUID();
  res.write(`event: task_id\\ndata: ${taskId}\\n\\n`);

  // Execute tool
  try {
    const result = await env.callTool(name, input);
    const data = JSON.stringify({ ok: true, output: result });
    res.write(`event: end\\ndata: ${data}\\n\\n`);
  } catch (error) {
    res.write(`event: error\\ndata: ${error.message}\\n\\n`);
  }

  res.end();
});

app.listen(8080, () => {
  console.log('ORS server running on port 8080');
});

Session Management

Key points:
  • Store session ID → environment instance mapping
  • Implement 15-minute timeout
  • Clean up on /delete
  • Handle concurrent sessions

Error Handling

Return proper HTTP status codes:
  • 400 - Bad request (invalid input)
  • 404 - Not found (session, tool, environment)
  • 500 - Server error
For tool errors, use SSE error event.

Best Practices

1. Validate Inputs

@tool
def submit(self, params: SubmitParams) -> ToolOutput:
    if not isinstance(params.answer, (int, float)):
        return ToolOutput(
            blocks=[TextBlock(text="Error: Answer must be a number")],
            reward=-0.1,
            finished=True
        )
    # ... rest of logic

2. Use Type Hints

from pydantic import BaseModel, Field

class SubmitParams(BaseModel):
    answer: float = Field(description="Your numeric answer")
    reasoning: str = Field(default="", description="Optional explanation")

3. Handle Async Operations

@tool
async def call_llm(self, params: CallLLMParams) -> ToolOutput:
    # Async LLM call
    response = await llm_client.complete(params.prompt)

    return ToolOutput(
        blocks=[TextBlock(text=response)],
        reward=0.0,
        finished=False
    )

4. Implement Proper Cleanup

def teardown(self):
    # Close connections
    if hasattr(self, 'db_connection'):
        self.db_connection.close()

    # Clean up temp files
    if hasattr(self, 'temp_dir'):
        shutil.rmtree(self.temp_dir)

5. Add Logging

import logging

logger = logging.getLogger(__name__)

@tool
def submit(self, params: SubmitParams) -> ToolOutput:
    logger.info(f"Received answer: {params.answer}")

    correct = (params.answer == self.task_spec["answer"])

    logger.info(f"Answer {'correct' if correct else 'incorrect'}")

    return ToolOutput(...)

Testing Your Server

See Testing Locally for comprehensive testing guide. Quick test:
# Start server
python server.py

# Test in another terminal
curl http://localhost:8080/health
curl http://localhost:8080/list_environments

Deployment

Local Development

python server.py

Production Deployment

Docker:
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "server.py"]
Run:
docker build -t my-ors-server .
docker run -p 8080:8080 my-ors-server

Cloud Deployment

Deploy to any cloud platform:
  • AWS: ECS, Lambda, EC2
  • GCP: Cloud Run, Compute Engine
  • Azure: Container Instances, App Service
  • Fly.io: fly launch
  • Railway: Connect GitHub repo

Next Steps


Key Takeaway: Implementing an ORS server is straightforward. Use the Python SDK for quick development, or implement the HTTP protocol in any language for full control. Focus on proper reward design and episode termination.