Для начала установим набор полезных функция для вывода данных от Дмитрия Родина
# набор полезных функций для вывода данных
!pip install -U --user git+https://github.com/madiedinro/rodin_helpers_py -q
import rodin_helpers as rh
!pip install -U --user arrow
!pip install -U --user python-dotenv
# Стандатные классы даты и времени
from datetime import datetime
# Модуль для с методами для итеративной обработки данных
from itertools import count
# Вывод данные в структурированном виде
from pprint import pprint
# Стандартный пакет для работы с json
import json
# альтернативный пакет для работы с данными
import arrow
# библиотека для работы с http
import requests
Записываем необходимые сведения в переменные:
amo_key
в настройках AmoCRM, домен и юзера вы знаете.amo_domain
и amo_user
вам известныПодготовим словарь, в котором будет храниться статус.Словарь используется т.к. из функции нелья установить переменную за ее границами. Редактировать словарь можно.
Берем в настройках ключ API, указывает другиме данные которые нам уже известны
amo_domain = 'https://randomint.amocrm.ru'
# логин AmoCRM
amo_user = 'randomint@armyspy.com'
# ключ API
amo_key = 'ea961728df7452688bcd8fe3cf8e3cb9e8c39e11'
today = arrow.now().format('YYYY-MM-DD')
state = {
'cookies': None
}
Поехали!
Подготовим функцию, которая принмает в аргументах логин и ключ. Дока по авторизации https://www.amocrm.ru/developers/content/api/auth
def auth(user, user_hash):
url = amo_domain + '/private/api/auth.php'
data = {
'USER_LOGIN': user,
'USER_HASH': user_hash
}
res = requests.post(url, data=data, params={'type':'json'})
print('status code', res.status_code)
if res.status_code == 200:
state['cookies'] = res.cookies
return res.json()
auth_result = auth(amo_user, amo_key)
pprint(auth_result)
Если 200 то все супер. Если нет - ищем ошибку
Запросим параметры аккаунта, где находится информация о номерах статусов. Дока по API аккаунта https://www.amocrm.ru/developers/content/api/account
res = requests.get(amo_domain + '/api/v2/account', params={'with': 'pipelines,groups,note_types,task_types'}, cookies=state['cookies'])
if res.status_code == 200:
data = res.json()
pprint(data)
А теперь попрбуем воспользоваться функцией из модуля rodin_helpers
, который мы загрузили в самом начале.
rh.walk(data, limit_list=1)
Не знаю как вам, а мне куда нагляднее. У меня номер статуса "Успешно реализовано" = 142
success_num = 142
При запросе можно передавать, лиды с какими статусами нам нужны. Мы выберем те, что находятся на завершительном этапе воронки
Слишком много информации. Создадим функцию которая будет превращать здоровенный объект сделки в небольшой и подходящий нам полезный объектик. Заодно избавимся от пути _embedded > items
.
orig_deals = data['_embedded']['items']
def format_deal(deal):
fields = {f['name'].lower(): f['values'][0].get('value') for f in deal['custom_fields']}
date = datetime.fromtimestamp(deal['closed_at'],)
return {
'id': deal['id'],
'uid': fields.get('uid') or '',
'cid': fields.get('cid') or '',
'sale': deal['sale'],
'date': date.strftime('%Y-%m-%d'),
'date_time': date.strftime('%Y-%m-%d %H:%M:%S'),
'account_id': deal['account_id']
}
deal = format_deal(orig_deals[0])
pprint(deal)
Так гораздо лучше. Применяем ко всем.
deals = [format_deal(d) for d in orig_deals]
Мы рассмотрим несколько вариантов, как это сделать.
Речь идет про библиотеку для работы с ClickHouse SimpleCH, детали можно найти в репозитории с библиотекой https://github.com/madiedinro/simple-clickhouse.
Возможности новые и экспериментальные, могут работать не во всех случаях. Но страшно удобные!
from simplech import ClickHouse
Создание экземпляра класса для работы с ClickHouse
ch = ClickHouse()
вызов ch.discovery создаст экземпляр TableDiscovery, где сразу будет произведена обработка имеющихся данных и подобрана схема хранения
td = ch.discover('deals', deals)
td
Видно, что все поля записались в dimensions. Надо только отметить какие из них метрики и выделить дату и первичный ключ
td.date('date').idx('account_id', 'date').metrics('sale')
Посмотреть какая схема хранения данных получилась
schema = td.merge_tree()
print(schema)
То что нужно!
ch.run("""
CREATE TABLE IF NOT EXISTS deals (
date Date,
date_time DateTime,
id UInt64,
account_id UInt64,
uid String,
cid String,
sale Int64
) ENGINE = MergeTree() PARTITION BY toYYYYMM(date) ORDER BY (id, date) SETTINGS index_granularity=8192
""")
Тишина, значит все окей!
Продобнее о библиотеке https://github.com/madiedinro/simple-clickhouse
from simplech import ClickHouse
ch = ClickHouse()
with ch.table('deals') as c:
for deal in deals:
c.push(deal)
Что у нас вышло
rh.print_rows([*ch.objects_stream('SELECT * FROM deals LIMIT 10')])
print(ch.select('SHOW TABLES'))
Если дубликадов не удается избежать. Иструкция DISTINCT как раз этим занимается,но будте аккуратны, не должно попасть ни одна колонка где данные могут отличаться!
query = """
SELECT DISTINCT id, uid, cid, date
FROM deals
"""
data = [*ch.objects_stream(query)]
rh.print_rows(data)
Самый простой способ описать ее в конфиге chwriter, он создаст ее и в дальнейшем расширит, при необходимости. Для этого в Theia создайте файл config/custom/chwriter/amocrm.yml
с содержимым:
clickhouse:
tables:
amocrm_deals:
date: Date
date_time: DateTime
id: UInt64
uid: String
cid: String
sale: Int64
account_id: Int64
_options:
engine: MergeTree() PARTITION BY toYYYYMM(date) ORDER BY (id, date) SETTINGS index_granularity=8192
В дальнейшем можно будет просто добавить необходимые колонки и они будут автоматически созданы при перезагрузке chwriter.
print(ch.select('SHOW TABLES'))
Самостоятельно
Это только начало пути. Многое можно сделать лучше, но перед этим надо озаботиться обработкой по датам. Поэтому вот вам задачка, поработать самостоятельно:
В дополнение содадим простенький дашборд в Grafana, показывающий кол-во завершившихся сделок по дням. Но чтобы было, что показывать давайте сгененрируем немного фейковых данных. Для этого нам потребуется генератор случайных чисел, импортируем его и сгенерируем необходимое кол-во сделок.
# Импортируем модуль генерации случайных чисел
from random import randint
# Сегодня
today = arrow.now()
# Сколько дней назад будем смещаться
use_days = 7
# Сколько сделок хоти
deals_amount = 15
# Список фэйков сделок
fake_deals = []
# при помощи среза (slice) берем только несколько записей для вывода, чтобы не засорять горизонт
pprint(fake_deals[:3])
# Пишем в кх
with ch.table('deals') as b:
for d in fake_deals:
b.push(d)
rh.print_rows([*ch.objects_stream('SELECT * FROM deals LIMIT 10')], limit=10)
Видео, как создавать панели с графиками в Grafana
Jupyter - это замечательный инструмент для написания кусочкой кода и отладки, но нам надо сделать инструмент, собираюзий данные по расписанию, а не блокнот. Поэтому превратим наш код в код сервиса.
Как зарегистрировать сервер и развернуть на нем Rockstat в части Zero
https://aiohttp.readthedocs.io/en/stable/client_quickstart.html
Пара статей и курс на эту тему. Асинхронность является специфической областью занинй для большинства python программиств
Через некоторое время запросы перестанут работать и будут выдавать
{'response': {'error': 'Неверный логин или пароль', 'error_code': '110', 'ip': '88.212.240.252', 'domain': 'wevdiw.amocrm.ru', 'server_time': 1547687325}}
Почему-то серверы АМО забывают сессию, и нужно опять опять авторизоваться.
Следует доработать функцию выполнения запросов, чтобы она обрабатывала ошибки авторизации и производила повторный запрос. Этот же алгоритм обработает ситуацию, когда нужно авторизоваться после перезапуска (потеряны значения переменных)
Далее функция, которая собрана из всего, что было ранее
def amo_query(path, params=None, data=None, json=None, method='get', attemps=2, auth_request=False):
# По-умолчанию высталено 2 прохождения цикла.
# На первом может произойти ошибка авторизации,
# за ней последует процесс авторизации.
# На втором проходе запрос будет выполнен с корретными привелегиями
for i in range(attemps):
# Стоим урл запроса и записываем в переменную
url = amo_domain + path
# базовые параметры, которые дописываются ко всем запросам
base_params = {'type':'json'}
# Выполнение запроса
res = requests.request(method, url, data=data, json=json, cookies=state['cookies'], params={**base_params, **(params or {})})
# если сервер вернул ответ что все ок
if res.status_code == 200:
# Сохраняем куки после успешной авторизации
if auth_request:
state['cookies'] = res.cookies
return res.json()
# В случае ошибки авторизации
elif res.status_code == 401:
# Если это не авторизационный запрос
if auth_request == False:
req_data = { 'USER_LOGIN': amo_user, 'USER_HASH': amo_key }
# Вызываем запрос авторизации
auth_result = amo_query('/private/api/auth.php', data=req_data, auth_request=True, method='post')
print(res.status_code, res.json())
amo_query('/api/v2/account')
Все работает. А теперь заменим библиотеку для выполнения запросов на aiohttp, она требуется для асинхронной работы (об этом потом).
Метод помечен async
. Для вызовы анинхронных методов используется слово await
.
В requests и aiohttp немного отличается работа с cookies, остальные параметры совпадают.
Возможно для кого-то сразу станте понятно, когда я скажу, что у asyncio точно такой же смысл, как и у node.js. В питоне есть несколько реализаций асинхронности, но самая быстрая (возможно и популярная в продакшн) это uvloop, которая кстати построена на той же самай библиотеке, что и node.js libuv Попробую показать наглядно в чем отличие.
import asyncio
# Примочка, чтобы asyncio корректно работал jupyter
import nest_asyncio
nest_asyncio.apply()
async def tick01():
for i in range(5):
await asyncio.sleep(0.1)
print(0.1)
return {'a': 111}
async def tick02():
for i in range(5):
await asyncio.sleep(0.2)
print(0.2)
return {'b': 222}
# Loop - корневой внутренний цикл, выполняющий поочередно все функции
loop = asyncio.get_event_loop()
# Все, что там выполняется это задачи
task = asyncio.gather(tick01(), tick02())
# Запускаем его до тех пор, пока задачи на закончатся
loop.run_until_complete(task)
Если присмотреться, то можно заметить что функции выполнялись параллельно, и потом вместе вернули результат. Но самое главное, это не многопоточность и нет проблема возникающих при использовании потоков. Асинхронность это когда в одном потоке функции выполнябтся частями. Переключение происзодит в момент ожидания await
. А учитывая что большую часть времени программы стоят в ожидании это открыло огромные возможности. Например Node.js это целиком асинхнорнный код, там крайне проблематично писать что-то синхронное, при этом один процесс может обработкать несколько тысяч запросов в секунду.
обычная функция выглядит так
def myfunc(a):
print(a)
и выполнять ее вот так
myfunc(1)
асинхронная функция
async def myfunc(a):
print(a)
и выполнять ее вот так
await myfunc(1)
В случае использования операторов контекста и/или генараторов вместо await
используеться async with
/ async for
. Не забивайте голову просто дописывайте и ориентируйтесь по примерам, сейчас не время в этом разбираться
В асинхронной стороне питоне не используюьт requests
, а используют aiohttp
. Аргументы практически идентичны, только появляется дополнительный слой сессии. Рассмотрим пример
res = requests.get(url, data=data, json=json, cookies=state['cookies'])
res.json()
Эквивалентом этого запроса будет
async with aiohttp.ClientSession(cookie_jar=state['cookies']) as sess:
async with sess.get(url, data=data, json=json) as res:
res = await res.json()
Да, запись получается немного шире, но
state = {'cookies': None}
import aiohttp
async def amo_query(path, params=None, data=None, json=None, method='get', attemps=2, auth_request=False):
# По-умолчанию высталено 2 прохождения цикла.
# На первом может произойти ошибка авторизации,
# за ней последует процесс авторизации.
# На втором проходе запрос будет выполнен с корретными привелегиями
for i in range(attemps):
# Стоим урл запроса и записываем в переменную
url = amo_domain + path
# базовые параметры, которые дописываются ко всем запросам
base_params = {'type':'json'}
# Выполнение запроса
async with aiohttp.ClientSession(cookie_jar=state['cookies']) as session:
async with session.request(method, url, data=data, json=json, params={**base_params, **(params or {})}) as res:
#res = requests.request()
# если сервер вернул ответ что все ок
if res.status == 200:
# Сохраняем куки после успешной авторизации
if auth_request:
state['cookies'] = session.cookie_jar
return await res.json()
# В случае ошибки авторизации
elif res.status == 401:
# Если это не авторизационный запрос
if auth_request == False:
req_data = { 'USER_LOGIN': amo_user, 'USER_HASH': amo_key }
# Вызываем запрос авторизации
auth_result = await amo_query('/private/api/auth.php', data=req_data, auth_request=True, method='post')
print(res.status, await res.json())
# Протестируем функцию
await amo_query('/api/v2/account', params={'status':success_num})
Преренесем код в сервис Rockstat, чтобы он выполнялся по расписанию. Но предварительно подготовим здесь небходимые функции.
Чтобы использовать систему презаписи надо сделать фиксированную конфигурацию, не зависящую от пришедших данных. Воспользуемся еще одним крайне полезным методом SimpleCH
code = td.pycode()
print(code)
Отлично, все работает! Теперь сделаем полноценный обработчик записи данных (точнее дозаписи - библиотека сравнивает что есть в БД с тем, что передается на обработку)
async def write_deals(deals):
td = ch.discover('deals', columns={'id': 'Int64', 'uid': 'Int64', 'cid': 'String', 'sale': 'Int64', 'date': 'Date', 'date_time': 'DateTime', 'account_id': 'Int64'}).metrics(*['sale']).dimensions(*['date_time', 'account_id', 'cid', 'uid', 'id', 'date']).date(*['date']).idx(*['account_id', 'date'])
today = arrow.now().format('YYYY-MM-DD')
async with td.difference('2019-01-01', today, deals) as d:
async for row in d:
td.push(row)
Соберем в одном месте конфигурацию и необходимые функции.
import arrow
from simplech import AsyncClickHouse
from datetime import datetime
# --- Параметря подключения к Amo
amo_domain = 'https://randomint.amocrm.ru'
# логин AmoCRM
amo_user = 'randomint@armyspy.com'
# ключ API
amo_key = 'ea961728df7452688bcd8fe3cf8e3cb9e8c39e11'
success_num = 142
state = {'cookies': None}
# Обатите внимание, вместо ClickHouse мы используем асинхронная AsyncClickHouse (неблокирующая версия) модуля
ch = AsyncClickHouse()
# Структура таблицы
td_deals = ch.discover('deals', columns={
'id': 'Int64',
'uid': 'Int64',
'cid': 'String',
'sale': 'Int64',
'date': 'Date',
'date_time': 'DateTime',
'account_id': 'Int64'
}).metrics(
'sale'
).date('date').idx('account_id', 'date')
def format_deal(deal):
fields = {f['name'].lower(): f['values'][0].get('value') for f in deal['custom_fields']}
date = datetime.fromtimestamp(deal['closed_at'],)
return {
'id': deal['id'],
'uid': fields.get('uid') or '',
'cid': fields.get('cid') or '',
'sale': deal['sale'],
'date': date.strftime('%Y-%m-%d'),
'date_time': date.strftime('%Y-%m-%d %H:%M:%S'),
'account_id': deal['account_id']
}
async def write_deals(deals, d1, d2):
# Создание таблицы
await td_deals.merge_tree(execute=True)
# Подсчет дельты записей
async with td_deals.difference(d1, d2, deals) as d:
async for row in d:
td_deals.push(row)
# Создадим функцию, которая будет производить весь цикл сбора данных, вызывая другие ф-ции
async def update_data():
d1 = '2019-01-01'
d2 = arrow.now().format('YYYY-MM-DD')
# TODO: Не забудьте сделать обработку дат при получении из апи и записи в БД!
deals_response = await amo_query('/api/v2/leads', params={'status': success_num})
deals = [format_deal(d) for d in deals_response['_embedded']['items']]
await write_deals(deals, d1, d2)
Не забудьте самостоятельно сделать обработку дат
# Произведем полный получения и сохранения данных
await update_data()
rh.print_rows([rec async for rec in ch.objects_stream('SELECT * FROM deals')], limit=10)
rh.print_rows([rec async for rec in ch.objects_stream('SELECT * FROM deals')], limit=10)
Все записалось, а также не записываются дубликаты при повторной обработке. Переносим все это в сервис
Ну все, сервис запущен, радуемся постоянно поступающему потоку данных.
На этом все. Ждите новых серий :)