-
-
Save mklokocka/1c459b2b3eadaa9bc4b229b4bc4470f5 to your computer and use it in GitHub Desktop.
Example for aiocassandra issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from aiohttp import web | |
from aiocassandra import aiosession | |
from cassandra.cluster import Cluster | |
from cassandra.query import SimpleStatement | |
logging.basicConfig(level=logging.DEBUG) | |
routes = web.RouteTableDef() | |
cluster = Cluster(executor_threads=10) | |
session = cluster.connect(wait_for_all_pools=True) | |
aiosession(session) | |
async def helper(resp): | |
queue = asyncio.Queue(maxsize=10) | |
async def queue_consumer(target, queue, *args): | |
try: | |
while True: | |
chunk = await queue.get() | |
await target(chunk, *args) | |
queue.task_done() | |
except asyncio.CancelledError: | |
logging.debug('cancelled error in consumer') | |
async def cass(queue): | |
cql = 'SELECT * FROM system.size_estimates;' | |
statement = SimpleStatement(cql, fetch_size=50) | |
async with session.execute_futures(statement) as results: | |
async for result in results: | |
await queue.put(result) | |
async def process_queue(data): | |
await resp.write(b'row processed\n') | |
tasks = [asyncio.ensure_future(cass(queue))] * 100 + [asyncio.ensure_future(queue_consumer(process_queue, queue))] | |
try: | |
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) | |
except Exception as e: | |
logging.debug(f'got exc {e}') | |
print(e) | |
finally: | |
for task in tasks: | |
logging.debug(f'cancel:{task}') | |
task.cancel() | |
@routes.get('/endpoint') | |
async def endpoint(request): | |
resp = web.StreamResponse() | |
await resp.prepare(request) | |
await helper(resp) | |
return resp | |
app = web.Application() | |
app.router.add_routes(routes) | |
if __name__ == "__main__": | |
web.run_app(app) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment