LINUX.ORG.RU

Очереди задач, потоки, мультипроцессы.

 , ,


0

2

В общем. Есть такая проблема. Нужно сделать систему для распределенного выполнения задач. Причем, задачи эти крутятся долго - могут день, два. И могут добавлятся, так и удалятся.

И вопрос, каким образом лучше сделать такую систему? Характер задач - много общение с БД, сторонними REST API + обработка входящих данных от REST API - проверка по условиям приходящих данных. Те в большей мере IO, но и CPU также есть.

Сейчас есть с горем пополам работающее решение (писало чудо-юдо до меня). Решение состоит в том, чтобы на каждую задачу плодить thread… Сейчас среднее количество таких задач подошло к 150 и решение это работает… Откровенно х$%@#.. В общем, надо переделывать.

P.S Сам пока думаю о asyncio, или redis + multiprocessing + asyncio. Второе не особо нравится из-за нежелания бороться с пересозданием connection и т.п проблем. В идеале бы 1000 таких задач без проблем держать. Сейчас в одном интерпретаторе через потоки 150 и все стало очень медленно. По ресурсам CPU решение не критично. Есть сервер на 32 ядра. Можно грузить хоть все.

GIL? Или какая реализация?

В действительности, если уж собрался переписывать - то может, лучше жабка/котлин? С многопотоком удобнее работать будет.

Deleted
()
Ответ на: комментарий от Deleted

Сроки жмут. Поэтому весь проект переписывать на другой язык сейчас не вариант, увы. Реализация да, GIL. Если бы GIL не было, то и проблем в общем то, таких больших не было сейчас.

crarkie
() автор топика

Если я правильно понял что задачи независимы то всё проще простого же. Для начала тебе нужна очередь (как я понимаю задач не супер много, поэтому её можно делать на чём угодно, хоть на файлах, хоть на postgresql, хоть на redis, хоть на этих модных хреноmq).

Потом, тебе нужен обработчик на asyncio. Он может выполнять несколько задач конкурентно, вплоть до какого-то потолка который зависит от реального соотношения CPU и IO в задаче, и потолок этот возможно будет меньше 100% загрузки 1 CPU, поскольку чем больше времени тратится в сыром CPU тем хуже работает asyncio -> растут задержки IO и пропускная способность падает. Это можно прикинуть, выяснить экспериментально или даже сделать автотюнинг. Может статься что тебе и одного ядра хватит на всё.

А если нет, то таких независимых обработчиков запускаешь от «сколько хватит» до «столько, чтобы загрузить все ядра» и тем самым тривиально масштабируешься. Если очередь позволяет можно даже на несколько машин.

Вроде всё. Все ядра тебе доступны, о проблемах с GIL и многопоточностью можешь даже не задумываться, тяжёлых ресурсов (типа процессов или подключений к БД) нужно тратить фиксированное число - O(обработчиков) вместо O(количества одновременных задач), код на asyncio получается простой.

slovazap ★★★★★
()
Последнее исправление: slovazap (всего исправлений: 1)
Ответ на: комментарий от WitcherGeralt

Ты ищешь ProcessPoolExecutor, он позволяет запустить таск в отдельном процессе и ждать его асинхронно.

Мне ответ ждать не нужно. По сути, суть задачи в обработке данных от другого сервиса и обращении в другим сервисам + запись статистики в БД. И эти задачи не заверщаются, а крутятся долгое время.

crarkie
() автор топика
Ответ на: комментарий от crarkie

Не принципиально.

По хорошему — поделить сервис и юзать очереди. А если good enough is enough — ProcessPoolExecutor в самый раз, только ноги береги.

WitcherGeralt ★★
()
Ответ на: комментарий от WitcherGeralt

ProcessPoolExecutor в самый раз, только ноги береги. Вот насчет этого. А что, если мне надо обращаться к Django ORM и к Redis(используется в качестве кеша)? Что с connections?

crarkie
() автор топика
Ответ на: комментарий от crarkie

Сейчас там все сделано очень криво. По хорошему, мб лучше на asyncio переписать? Вот отрывок кода:

def run_sync_task(task_id):
    loop = asyncio.new_event_loop()
    task = loop.create_task(run_task(task_id))
    try:
        loop.run_until_complete(task)
    except Exception as ex:
        logger.exception(f'BUG:Unexpected Exception {ex}{task_id}\n')
        bug_bot.send_bug('Unexpected exception: \n' + traceback.format_exc())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()


class RedisQueue:
    def __init__(self, connection, key):
        self.connection = connection
        self.key = key

    async def get(self):
        while not self:
            await asyncio.sleep(1)
        return self.connection.rpop(self.key).decode('utf-8')

    def put(self, value):
        return self.connection.lpush(self.key, value)

    def __bool__(self):
        return bool(len(self))

    def __len__(self):
        return self.connection.llen(self.key)


class TasksRunner:
    def __init__(self):
        self.loop = asyncio.get_event_loop()
        self.queue = RedisQueue(redis, 'tasks_queue')

    async def run_tasks(self):
        for task in models.Task.objects.filter(
                state__in=['running', 'sleeping', 'rebooting', 'failed']):

            try:
                task.change_state('stopped')
                task.change_state('retry')
            except:
                try:
                    task.change_state('retry')
                except:
                    try:
                        task.change_state('failed')
                    except:
                        task.change_state('running')
        while True:
            self.loop.run_in_executor(None, run_sync_task, await self.queue.get())

    async def add_func_to_queue(self, func, args):
        self.loop.run_in_executor(None, func, args)
        return 'OK'

    def run(self):
        self.loop.run_until_complete(self.run_tasks())

Не знаю, зачем в run_sync_task делать loop на одну задачу. Но, в run_task используется await функции и async генераторы (хотя, в генераторах запросы через requests). В общем, хотелось бы переписать все эти и с минимальными усилиями.

crarkie
() автор топика
Ответ на: комментарий от crarkie

Соединения нужно устанавливать уже после того как процесс форкнется. Проще всего сделать ленивую инициализацию.

Вообще, джанга ведь всё меняет, ORM в ней синхронный. Вебню асинхронную уже, вроде бы сделали, но тут я посоветовать ничего не могу, я довольно давно с ней имел дело, когда asyncio ещё не было, и оно у них, видимо, всё ещё настолько говно, что до сих пор об асинхроне в документации почти ничего нет. Но переписывать однозначно нужно, скрещивание ужа с ежом до добра не доведёт.

Чтобы обойтись малой кровью, тебе, наверное, проще переписать наоборот всё синхронно. У тебя кроме редиса что-то асинхронное есть вообще? Для него, разумеется, есть обычная синхронная библиотека.

Если хочешь полноценную асинхронщину, то придётся выбросить джангу, то есть вообще всё переписывать, видимо. Можно, конечно, по частям. Но объём работы от этого только вырастет.

WitcherGeralt ★★
()
Последнее исправление: WitcherGeralt (всего исправлений: 1)
Ответ на: комментарий от WitcherGeralt

Чтобы обойтись малой кровью, тебе, наверное, проще переписать наоборот всё синхронно. У тебя кроме редиса что-то асинхронное есть вообще? Для него, разумеется, есть обычная синхронная библиотека.

Думаю, затык в БД будет не слишком большой. Там IO нагрузки скорее больше в запросах по сети. Для этого я перепишу все на aiohttp. Для Redis тоже есть async библиотека. Пусть доступ к БД будет синхронных. База данных на том же сервере. Так что не думаю, что это будет тормозить сильно. При желании, там 60-70% можно вынести в Redis. И промежуточно раз в n минут записывать в БД. Синхронно переписать точно не вариант. Там много задач и синхронно там будет затык. Там и сейчас по сути основной затык в большом количестве потоков и переключении контекста.

Какая у меня сейчас идея - единая очередь и PoolExecutor. в executor запускается worker. А worker в свою очередь запускает asyncio loop, в котором крутятся задачи. Сделать limit по задачам на каждый worker (допустим, на 1 worker 30 задач). Если появляется в очереди новая задача, то worker смотрит, есть ли у него еще место до лимита. Если да, то ставит задачу из очереди в asyncio loop. Другой момент, как придумать балансировку. Чтобы все worker были одинаково загружены. Т.е, допустим, выбирать самый незагруженный задачами worker. Ну или просто забить. Пусть 1 worker возьмет 50 задач, еще один еще 50. А дальше уже будут пустовать и ждать новых задач. Так сказать, на будущее.

crarkie
() автор топика
Ответ на: комментарий от crarkie

на 1 worker 30 задач

Это для общения в внешними API?

Чтобы все worker были одинаково загружены. Т.е, допустим, выбирать самый незагруженный задачами worker

Так если они все будут ломиться в очередь, то как раз самый свободный и будет выхватывать задачу. Я бы даже лимит не делал, ибо из-за него ты рискуешь недоутилизировать ядра и тормозить на ровном месте.

А почему бы тебе не делать всё в рамках одного процесса? Можно просто запустить много инстансов API и всё.

WitcherGeralt ★★
()
Ответ на: комментарий от WitcherGeralt

А почему бы тебе не делать всё в рамках одного процесса? Можно просто запустить много инстансов API и всё.

Потому что оно сейчас так и сделано. Но потоками. И работает это медленно. Можно и сделать asyncio задачами. Но тк задач много и они не останавливаются, а работают все время, то боюсь, будет снова затык. Поэтому, хочу разбить задачи на n процессов по x в каждом. Т.к сервер на 32 ядра, то зачем ему простаивать? Ну и каждая задача может быть как остановлена/удалена, так и запущенна новая. Сами задачи регистрируются в DB. Сама запущенная задача смотрит свой статус в redis. Если через django в redis меняется статус, то задача завершается/перезапускается, etc. Основная проблема, повторюсь же, что задача крутится все время. Если бы она делала действия и завершалась, то без проблем бы решил уже все через Pool. Но сейчас делать pool на 150 процессов, а потом увеличивать по мере увеличения количества задач не кажется хорошей идеей от слова совсем.

crarkie
() автор топика
Последнее исправление: crarkie (всего исправлений: 2)
Ответ на: комментарий от WitcherGeralt

Python-pseudo код того, что я примерно думаю сделать:

    def worker(tasks_queue):
        async def pending_tasks():
            while True:
                try:
                    await asyncio.sleep(1)
                    task_id = tasks_queue.get(False)
                except queue.Empty:
                    continue
                else:
                    loop.create_task(run_task(task_id))

        loop = asyncio.new_event_loop()
        loop.run_until_complete(pending_tasks())

    tasks_queue = Queue()
    executor = ProcessPoolExecutor(max_workers=NUM_WORKERS)
    for i in range(NUM_WORKERS):
        executor.submit(worker, tasks_queue)

    ...

    tasks = models.Task.object.filter(state=pending).all()
    for pending in tasks:
        tasks_queue.put(pending.task_id)
crarkie
() автор топика
Ответ на: комментарий от crarkie

Как вариант, по очереди на каждый воркер, ну и ещё какой-то контроллер, который задачки по очередям распихивает:

async def worker1(queue):
    while True:
        asyncio.sleep(1)
        task = await queue.get()

        await processing(task)
...


worker1_queue = asyncio.Queue()
w1 = asyncio.create_task(worker1(worker1_queue))

...

while condition:
    ...
    await worker1_queue.put(task1)
    await worker2_queue.put(task2)
    await worker3_queue.put(task3)

...
w1.cancel()
await w1
...
vvn_black ★★★★★
()
Последнее исправление: vvn_black (всего исправлений: 1)
Ответ на: комментарий от vvn_black

Как вариант, по очереди на каждый воркер, ну и ещё какой-то контроллер, который задачки по очередям распихивает:

Немного не совсем то. Но, как вариант, можно сделать очередь для каждого worker, да. В redis хранить для каждого worker текущее количество работающих задач. И в выбирая самый незанятый worker, добавлять к нему в очередь task.

crarkie
() автор топика
Ответ на: комментарий от crarkie

Не, не, я предлагаю в качестве воркера использовать весь монололит. Запускаешь десяток-другой копий сервиса каждый в своём процессе, разумеется, а над ними, допустим, nginx (или какой-нибудь сервер приложений, поддерживаюший aiohttp) занимается проксированием и балансировкой. Тебе только общий стейт, сесии например, нужно будет вынести куда-то во вне, в тот же редис.

WitcherGeralt ★★
()
Ответ на: комментарий от WitcherGeralt

Так дело в том, что мои задачи - это не просто API. Они постоянно делают какую-то работу. А не только по запросу через api. И добавляются/удаляются задачи тоже по наличию и статусу их в БД.

crarkie
() автор топика
Ответ на: комментарий от crarkie

Есть phpшный проект, в основном io-bound через обращения к разным базам и rest-api, но есть и куски логики обработки этих данных, не шибко интенсивные вычисления, работает через php-fpm, т.е. это процессы, не потоки, на ноде 8 ядер и до 100 процессов, работает норм. Из минусов - пулинг соединений по сути не будет работать. Так что самих по себе 150 io-bound процессов на 32 ядрах я бы не стал бояться. А вообще посмотрите во что на самом деле текущее решение упирается, может не в GIL, а в какой то пул соединений?

cobold ★★★★★
()

По-хорошему тут нужен пул. Из пула задачи берутся воркерами, работающими в отдельных процессах (ты правильно думаешь в сторону multiprocessing). В самих воркерах можно заюзать те самые потоки с GIL, либо более продвинутую концепцию - async/await из новой версии. Но это подходит только для i/o части.

Больше не знаю что сказать, подробностей нет. Нужно продумывать детали - как синхронизировать воркеры (придётся сериализовывать объекты для передачи данных в них), где конкретно нужен async/await и прочее.

InterVi ★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.