'2' + '2' —> '22' текстовый формат.
При умножении текста на число, текст дублируется столько раз, чему равно число.
В нашем случае '22' * 3 —> '222222'
Если было бы, например 'hello' * 3 —> 'hellohellohello'
Pythoner - это канал в Telegram, который создан для любителей языка программирования Python. На канале @pythonercode вы найдете полезные материалы по Python, которые будут интересны как опытным разработчикам, так и начинающим. Здесь вы сможете найти информацию о последних тенденциях в мире Python, полезные советы по разработке, а также обзоры новых библиотек и инструментов.
Канал также предлагает возможность сотрудничества. Для этого вы можете обратиться к @flattys. Если у вас есть вопросы или предложения, менеджер канала @geralt54 всегда готов помочь.
Присоединяйтесь к каналу Pythoner и узнавайте много нового о Python. Канал также имеет свою страницу на бирже Telegram: https://telega.in/c/pythonercode
30 Jan, 15:06
29 Jan, 10:51
28 Jan, 13:23
27 Jan, 11:51
import streamlit as st
from PIL import Image
import model # ваша ML модель
st.title("🐕 Определитель пород собак")
uploaded_file = st.file_uploader("Загрузите фото собаки")
if uploaded_file:
image = Image.open(uploaded_file)
st.image(image, caption="Загруженное фото")
prediction = model.predict(image)
st.success(f"Это {prediction} с вероятностью 95%!")
import streamlit as st
import pandas as pd
import plotly.express as px
st.title("📊 Анализ продаж")
data = pd.read_csv("sales.csv")
# Интерактивный фильтр по датам
date_range = st.date_input("Выберите период")
# Динамический график
fig = px.line(data, x="date", y="sales")
st.plotly_chart(fig)
st.slider()
– и у вас уже есть полноценный интерактивный элемент управленияst.plotly_chart()
– и готово!pip install streamlit
streamlit hello # запустит демо-приложение
25 Jan, 12:54
Stan
.24 Jan, 18:18
from pybrain.datasets import ClassificationDataSet
from pybrain.supervised.trainers import BackpropTrainer
from pybrain.tools.shortcuts import BuildNetwork
# Создание выборки данных
data = ClassificationDataSet(2, 1)
# Добавление данных (x1, x2, label)
data.addSample((0, 0), (0,))
data.addSample((0, 1), (1,))
data.addSample((1, 0), (1,))
data.addSample((1, 1), (0,))
# Создание нейронной сети
network = BuildNetwork(2, 3, 1)
# Обучение нейронной сети
trainer = BackpropTrainer(network, data)
for _ in range(1000):
trainer.train()
# Проверка результатов
for sample in [(0, 0), (0, 1), (1, 0), (1, 1)]:
output = network.activate(sample)
print(f'Input: {sample}, Output: {output}')
24 Jan, 11:09
23 Jan, 16:21
22 Jan, 16:03
@click.command()
.help
сообщения.bash
/zsh
автодополнения.Click
широко используется для создания консольных утилит, CLI интерфейсов для web фреймворков, DevOps инструментов, скриптов автоматизации и других задач, где требуется командная строка.21 Jan, 16:18
set
и возвращает boolean значение — True
если первый set
содержит все элементы второго, и False
в противном случае. 20 Jan, 15:59
19 Jan, 17:02
Pyspark
автоматически распределяет данные и вычисления между узлами кластера для максимальной производительности.Pyspark
есть специальные типы данных (RDD, DataFrame, Dataset), которые позволяют удобно работать с табличными и структурированными данными.Pyspark
вместе с другими популярными библиотеками Python для анализа данных.Pyspark
используется для быстрой параллельной обработки больших объемов данных с помощью кластеров, что делает его очень полезным инструментом для big data и машинного обучения.17 Jan, 15:06
16 Jan, 16:36
15 Jan, 06:02
age = 15
type = "teenager" if age < 18 else "adult"
print(type) # Выведет: teenager
13 Jan, 15:12
Mapping
определяет интерфейс, общий для всех отображений ключ-значение, включая такие методы как keys()
, values()
, items()
и другие. Mapping
часто используется вместе с isinstance
или issubclass
для проверки, является ли объект словарем. Также он полезен при написании функций, которым нужно принимать на вход отображения, но без привязки к конкретному типу как dict
. Mapping
гарантирует наличие основных методов словаря у переданного объекта.12 Jan, 09:26
complex(a, b)
, где a
— действительная часть, b
— мнимая.a + bj
.complex(string)
.real
и imag
. abs()
, conjugate(),
polar()
позволяют получить модуль, сопряженное число и представление в тригонометрической форме.10 Jan, 16:57
09 Jan, 17:10
from peewee import *
# Создаем подключение к базе данных SQLite (или подключаемся к существующей)
db = SqliteDatabase('my_database.db')
# Определяем модель данных (таблицу)
class User(Model):
username = CharField(unique=True)
password = CharField()
email = CharField()
active = BooleanField(default=True)
joined_date = DateTimeField()
class Meta:
database = db # Связываем модель с базой данных
# Создаем таблицу в базе данных (если она не существует)
db.connect()
db.create_tables([User])
# Создаем нового пользователя
user = User(username='testuser', password='password', email='[email protected]', joined_date='2024-07-26 10:00:00')
user.save() # Сохраняем пользователя в базу данных
# Получаем пользователя по имени пользователя
retrieved_user = User.get(User.username == 'testuser')
print(retrieved_user.email) # Вывод: [email protected]
# Обновляем данные пользователя
retrieved_user.active = False
retrieved_user.save()
# Удаляем пользователя
retrieved_user.delete_instance()
# Запрос с фильтрацией
active_users = User.select().where(User.active == True)
for user in active_users:
print(user.username)
db.close()
User
, подключение к базе данных SQLite, создание таблицы, добавление, получение, обновление и удаление записей. Также показан пример простого запроса с фильтрацией. peewee предоставляет интуитивно понятный интерфейс для работы с базами данных, делая код более читаемым и легким в поддержке, чем при использовании сырых SQL-запросов.08 Jan, 15:48
06 Jan, 19:33
from boltons.iterutils import chunked
data = list(range(20))
chunks = list(chunked(data, 4)) # Разделение данных на чанки размером 4
print(chunks)
# Вывод: [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15], [16, 17, 18, 19]]
from boltons.dictutils import OMD
# Ordered Multi Dict, сохраняет порядок добавления ключей и позволяет иметь несколько значений для одного ключа
omd = OMD()
omd[1] = "a"
omd[2] = "b"
omd[1] = "c"
print(omd)
# Вывод: OMD([(1, 'a'), (2, 'b'), (1, 'c')])
print(omd.getlist(1)) # Получение всех значений для ключа 1
# Вывод: ['a', 'c']
from boltons.funcutils import wraps
def my_decorator(f):
@wraps(f) # Сохраняет метаданные оригинальной функции
def wrapper(*args, **kwargs):
print("Before function call")
result = f(*args, **kwargs)
print("After function call")
return result
return wrapper
@my_decorator
def my_function(a, b):
"""
This is my function.
"""
return a + b
print(my_function(1, 2))
# Вывод:
# Before function call
# After function call
# 3
print(my_function.__doc__) # Метаданные сохраняются благодаря wraps
# Вывод: This is my function.
chunked
для разделения списка на чанки, OMD
(Ordered Multi Dict) для работы со словарем, сохраняющим порядок и позволяющим иметь несколько значений для одного ключа, и wraps
для создания декораторов, сохраняющих метаданные декорируемых функций. Эти примеры показывают лишь небольшую часть функциональности, предоставляемой boltons. Библиотека содержит множество других полезных инструментов для различных задач.05 Jan, 17:35
import pandas as pd
на import modin.pandas as pd
, можно значительно ускорить выполнение многих операций, таких как чтение данных, фильтрация, агрегация и другие.# !pip install modin[ray] # или modin[dask], если предпочитаете Dask
import time
import modin.pandas as pd # Замена import pandas as pd
start_time = time.time()
# Загрузка большого датасета (замените на свой путь к файлу)
df = pd.read_csv("large_dataset.csv")
# Выполнение некоторых операций с DataFrame
# ... (например, фильтрация, агрегация и т.д.) ...
df = df[df['column_name'] > 100]
df = df.groupby(['column_name1','column_name2'])['column_name3'].sum()
end_time = time.time()
print(f"Время выполнения с Modin: {end_time - start_time} секунд")
# Для сравнения, выполните тот же код с обычным Pandas:
import time
import pandas as pd
start_time = time.time()
# Загрузка большого датасета (замените на свой путь к файлу)
df = pd.read_csv("large_dataset.csv")
# Выполнение некоторых операций с DataFrame
# ... (например, фильтрация, агрегация и т.д.) ...
df = df[df['column_name'] > 100]
df = df.groupby(['column_name1','column_name2'])['column_name3'].sum()
end_time = time.time()
print(f"Время выполнения с Pandas: {end_time - start_time} секунд")
modin.pandas
, а затем с обычным pandas
, чтобы сравнить время выполнения. Для больших датасетов Modin может значительно сократить время обработки, автоматически распараллеливая вычисления. Убедитесь, что у вас установлен Ray или Dask (в зависимости от выбранного движка для Modin) и замените "large_dataset.csv"
на путь к вашему большому CSV-файлу.04 Jan, 15:56
03 Jan, 17:30
HelloWorld
, который выглядит следующим образом:public class HelloWorld {
public static String greeting() {
return "Hello from Java!";
}
}
greeting
:import jpype
# Запуск виртуальной машины Java
jpype.startJVM(jpype.getDefaultJVMPath())
# Загрузка класса
HelloWorld = jpype.JClass("HelloWorld")
# Вызов метода
result = HelloWorld.greeting()
print(result) # Вывод: Hello from Java!
# Остановка виртуальной машины Java
jpype.shutdownJVM()
02 Jan, 11:24
def calculate_area(length, width):
return length * width
# Использование функции
room_area = calculate_area(5, 4)
print(f"Площадь комнаты: {room_area} кв.м.")
class BankAccount:
def __init__(self, balance):
self.balance = balance
def deposit(self, amount):
self.balance += amount
return f"Новый баланс: {self.balance}"
# Использование метода
account = BankAccount(1000)
account.deposit(500) # Вызов метода через объект
01 Jan, 07:07
30 Dec, 19:51
from datasets import load_dataset
# Загрузка датасета "imdb" с Hugging Face Hub
dataset = load_dataset("imdb")
print(dataset)
# Вывод: DatasetDict({
# train: Dataset({
# features: ['text', 'label'],
# num_rows: 25000
# })
# test: Dataset({
# features: ['text', 'label'],
# num_rows: 25000
# })
# unsupervised: Dataset({
# features: ['text', 'label'],
# num_rows: 50000
# })
# })
train_data = dataset['train']
print(train_data[0]) # Доступ к первому примеру в обучающем наборе
# Вывод: {'text': 'A series of escapades demonstrating the adage that what is good for the goose is also good for the gander, some of which occasionally amuses but none of which amounts to much of a story.', 'label': 0}
small_train_dataset = dataset["train"].shuffle(seed=42).select(range(1000)) # Перемешивание и выборка
from datasets import ClassLabel
import random
import pandas as pd
def show_random_elements(dataset, num_examples=10):
picks = []
n = len(dataset)
for _ in range(num_examples):
pick = random.randint(0, n - 1)
picks.append(dataset[pick])
df = pd.DataFrame(picks)
if isinstance(dataset.features['label'], ClassLabel):
df['label'] = df['label'].apply(lambda x: dataset.features['label'].int2str(x))
print(df)
show_random_elements(small_train_dataset)
# Вывод: таблица с 10 случайными примерами
load_dataset
. Выводится информация о структуре датасета и показан доступ к отдельным примерам. Также представлен пример перемешивания и выборки подмножества данных для создания меньшего обучающего набора. Функция show_random_elements
демонстрирует удобный способ просмотра случайных элементов из датасета в формате таблицы, обрабатывая при этом ClassLabel
для отображения понятных названий меток.30 Dec, 16:31
from transformers import pipeline
classifier = pipeline("sentiment-analysis") # Загрузка предобученной модели для анализа тональности текста
results = classifier([
"I love this library!",
"This is a terrible movie.",
"This is a neutral statement."
])
for result in results:
print(result)
# Примерный вывод:
# [{'label': 'POSITIVE', 'score': 0.9998950958251953}]
# [{'label': 'NEGATIVE', 'score': 0.9991175532341003}]
# [{'label': 'NEGATIVE', 'score': 0.9865201115608215}] # Модель может ошибаться, особенно на нейтральных высказываниях
from transformers import pipeline
generator = pipeline('text-generation', model='gpt2') # Загрузка модели для генерации текста
text = generator("Once upon a time, there was a large language model.", max_length=50, num_return_sequences=2)
for generated_text in text:
print(generated_text['generated_text'])
# Примерный вывод (будет отличаться при каждом запуске):
# Once upon a time, there was a large language model. It was trained on a massive dataset of text and code, and it could generate text, translate languages, write different kinds of creative content, and answer your questions in an informative way.
# Once upon a time, there was a large language model. And he lived in a little house made of straw. One day, he was sitting in his house, reading a book
from transformers import pipeline
summarizer = pipeline("summarization")
text = """
The quick brown fox jumps over the lazy dog. This is a test sentence.
It is used to demonstrate text summarization. The fox is brown and quick.
The dog is lazy. Summarization is a useful NLP task.
"""
summary = summarizer(text, max_length=30, min_length=10, do_sample=False)
print(summary[0]['summary_text'])
# Примерный вывод:
# The quick brown fox jumps over the lazy dog. It is used to demonstrate text summarization. The fox is brown and quick. The dog is lazy.
pipeline
для создания готовых к использованию объектов для анализа тональности текста, генерации текста и суммаризации. pipeline
автоматически загружает необходимые модели и токенизаторы. Обратите внимание, что для первого запуска потребуется скачать предобученные модели, что может занять некоторое время. Также показаны примеры настройки параметров генерации и суммаризации.29 Dec, 11:15
import plotly.express as px
import pandas as pd
# Создаём тестовые данные
df = pd.DataFrame({
'Месяц': ['Янв', 'Фев', 'Март', 'Апр', 'Май'],
'Продажи': [100, 150, 200, 180, 250],
'Прибыль': [20, 30, 40, 35, 50]
})
# Создаём интерактивный график
fig = px.line(df, x='Месяц', y=['Продажи', 'Прибыль'],
title='Динамика продаж и прибыли',
template='plotly_dark')
# Добавляем hover-эффекты
fig.update_traces(mode='lines+markers', hovertemplate='%{y:,.0f}₽')
# Сохраняем как HTML или показываем в браузере
fig.write_html('sales_dashboard.html')
from bokeh.plotting import figure, show
from bokeh.layouts import column
from bokeh.models import ColumnDataSource, HoverTool
import numpy as np
# Создаём интерактивный scatter plot
x = np.random.normal(0, 1, 1000)
y = np.random.normal(0, 1, 1000)
source = ColumnDataSource(data=dict(
x=x,
y=y,
size=np.random.uniform(5, 15, 1000),
color=['#%06x' % np.random.randint(0, 0xFFFFFF) for _ in range(1000)]
))
p = figure(width=800, height=600, title='Интерактивный Scatter Plot')
p.scatter('x', 'y', size='size', color='color', alpha=0.6, source=source)
# Добавляем интерактивные подсказки
hover = HoverTool(tooltips=[
('X', '@x{0.000}'),
('Y', '@y{0.000}'),
('Размер', '@size{0.00}')
])
p.add_tools(hover)
show(p)
import plotly.graph_objects as go
fig = go.Figure(
data=[go.Scatter(x=[1, 2, 3], y=[1, 3, 2])],
layout=dict(
updatemenus=[dict(
type="buttons",
buttons=[dict(label="Play",
method="animate",
args=[None])]
)]
)
)
from bokeh.plotting import curdoc
from functools import partial
from tornado.ioloop import IOLoop
def update():
source.data['y'] = np.random.rand(100)
curdoc().add_periodic_callback(update, 100) # Обновление каждые 100мс
from dataclasses import dataclass
from typing import List
@dataclass
class DataPoint:
x: float
y: float
category: str
data_points: List[DataPoint] = [] # Эффективнее, чем DataFrame для больших данных
27 Dec, 13:57
import torch
import torch.nn as nn
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.layer1 = nn.Linear(784, 128)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(128, 10)
def forward(self, x):
x = self.layer1(x)
x = self.relu(x)
return self.layer2(x)
Создаем модель одной строчкой! 🎯
model = SimpleNet().to('cuda' if torch.cuda.is_available() else 'cpu')
import tensorflow as tf
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
tf.keras.layers.Dense(10)
])
# Компилируем модель
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# PyTorch стиль
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
for batch_x, batch_y in dataloader:
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
# Разбиваем данные
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Следим за метриками
val_loss = []
for epoch in range(epochs):
model.train()
# ... обучение ...
model.eval()
with torch.no_grad():
val_predictions = model(X_val)
v_loss = criterion(val_predictions, y_val)
val_loss.append(v_loss.item())
26 Dec, 19:38
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import HtmlFormatter
code = 'print("Hello, world!")'
highlighted_code = highlight(code, PythonLexer(), HtmlFormatter())
print(highlighted_code)
# <div class="highlight"><pre><span></span><span class="nb">print</span><span class="p">(</span><span class="s2">"Hello, world!"</span><span class="p">)</span>
# </pre></div>
highlight
, PythonLexer
и HtmlFormatter
. Затем определяется исходный код, который нужно подсветить. Функция highlight
принимает код, лексер (в данном случае PythonLexer
для Python) и форматтер (в данном случае HtmlFormatter
для HTML) и возвращает подсвеченный код.26 Dec, 19:30
click
и предоставляет простой и интуитивно понятный способ определения команд, аргументов и опций.import typer
app = typer.Typer()
@app.command()
def hello(name: str):
"""
Простое приветствие.
"""
print(f"Привет, {name}!")
@app.command()
def goodbye(name: str, formal: bool = False):
"""
Прощание. Можно сделать формальным.
"""
if formal:
print(f"До свидания, {name}!")
else:
print(f"Пока, {name}!")
if __name__ == "__main__":
app()
python main.py --help
, вы получите автоматически сгенерированную справку по использованию приложения.25 Dec, 19:49
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# Инициализация драйвера Chrome (убедитесь, что ChromeDriver установлен и находится в PATH)
driver = webdriver.Chrome()
try:
# Открытие страницы
driver.get("https://www.example.com")
# Явное ожидание элемента с ID "my-element" (до 10 секунд)
element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "my-element"))
)
# Вывод текста элемента
print(element.text)
finally:
# Закрытие браузера
driver.quit()
25 Dec, 15:32
24 Dec, 19:59
import asyncio
import aioredis
async def redis_example():
try:
# Подключение к Redis
redis = await aioredis.create_redis_pool(
'redis://localhost',
db=0,
encoding='utf-8', # Декодирование в UTF-8
minsize=5, # Минимальное количество соединений в пуле
maxsize=10 # Максимальное количество соединений в пуле
)
# Работа со строками
await redis.set('name', 'John Doe')
name = await redis.get('name')
print(f"Name: {name}") # Вывод: Name: John Doe
# Работа со списками
await redis.rpush('my_list', 'one', 'two', 'three')
list_length = await redis.llen('my_list')
list_items = await redis.lrange('my_list', 0, -1)
print(f"List length: {list_length}") # Вывод: List length: 3
print(f"List items: {list_items}") # Вывод: List items: ['one', 'two', 'three']
# Работа с хэшами
await redis.hset('user:1', 'name', 'Alice')
await redis.hset('user:1', 'age', 30)
user_data = await redis.hgetall('user:1')
print(f"User data: {user_data}") # Вывод: User data: {'name': 'Alice', 'age': '30'}
# Работа с множествами
await redis.sadd('my_set', 'apple', 'banana', 'cherry')
set_members = await redis.smembers('my_set')
print(f"Set members: {set_members}") # Вывод: Set members: {'apple', 'banana', 'cherry'}
# Использование pipeline
async with redis.pipeline(transaction=True) as pipe: # transaction=True для атомарности операций
await pipe.set('key1', 'value1')
await pipe.get('key1')
await pipe.hset('user:2', 'name', 'Bob')
results = await pipe.execute()
print(f"Pipeline results: {results}")
redis.close()
await redis.wait_closed()
except aioredis.RedisError as e:
print(f"Ошибка Redis: {e}")
if __name__ == "__main__":
asyncio.run(redis_example())
aioredis.create_redis_pool
, работа с различными типами данных (строки, списки, хэши, множества), и использование pipeline для пакетной обработки команд в транзакции. encoding='utf-8'
в параметрах подключения обеспечивает декодирование в UTF-8. minsize
и maxsize
управляют размером пула соединений. Обработка исключения aioredis.RedisError
помогает выявить проблемы с Redis. Важно запускать асинхронный код внутри asyncio.run
.24 Dec, 16:26
from tqdm import tqdm
import time
# Простой пример с range
for i in tqdm(range(100)):
time.sleep(0.02) # Имитация длительной операции
# С ручным обновлением
with tqdm(total=100) as pbar:
for i in range(10):
time.sleep(0.1)
pbar.update(10) # Обновление индикатора на 10 единиц
# С описанием
for i in tqdm(range(100), desc="Обработка данных"):
time.sleep(0.02)
# Вложенные циклы
from tqdm import trange
for i in trange(4, desc="1st loop"):
for j in trange(5, desc="2nd loop", leave=False):
for k in trange(50, desc="3rd loop", leave=False):
time.sleep(0.001)
# С pandas
import pandas as pd
df = pd.DataFrame({'a': range(10000)})
# Применение tqdm к pandas DataFrame (apply с progress_apply)
tqdm.pandas() # Необходимо для progress_apply
df['b'] = df['a'].progress_apply(lambda x: x*x) # Индикатор выполнения для apply
# Пример с файлами
import os
with open("large_file.txt", "w") as f:
for i in trange(10000, desc="Создание файла"):
f.write("This is a line of text.\n")
with open("large_file.txt", "r") as f:
for line in tqdm(f, total=os.path.getsize("large_file.txt"), unit="B", unit_scale=True, unit_divisor=1024, desc="Чтение файла"):
# Обработка каждой строки файла
pass
range
, ручным обновлением, описанием, вложенными циклами, интеграцией с pandas DataFrame с помощью progress_apply
и обработкой файлов. Обратите внимание на использование tqdm.pandas()
для работы с pandas. Пример с файлами показывает, как отображать прогресс чтения/записи с учетом размера файла.23 Dec, 19:39
httpx
. В Python httpx предоставляет мощный и удобный асинхронный интерфейс для работы с HTTP-запросами. Асинхронный режим позволяет эффективно обрабатывать множество запросов параллельно, не блокируя основной поток выполнения, что особенно полезно при взаимодействии с медленными или перегруженными серверами.import asyncio
import httpx
async def fetch_page(client: httpx.AsyncClient, url: str):
try:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return {"url": url, "status_code": response.status_code, "content": response.text}
except httpx.HTTPError as exc:
return {"url": url, "error": f"HTTP Error: {exc}"}
except httpx.TimeoutException:
return {"url": url, "error": "Request timed out"}
except Exception as e:
return {"url": url, "error": f"An unexpected error occurred: {e}"}
async def main():
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.python.org",
"https://this-is-an-invalid-url.com", # Пример некорректного URL
"https://jsonplaceholder.typicode.com/todos/1" # Пример API
]
async with httpx.AsyncClient(http2=True) as client: # Использование HTTP/2
tasks = [fetch_page(client, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
async def make_requests_with_limits(urls, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(url):
async with semaphore:
return await fetch_page(httpx.AsyncClient(), url)
async with httpx.AsyncClient() as client:
tasks = [fetch_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
if __name__ == "__main__":
asyncio.run(main())
# Пример с ограничением числа одновременных запросов
asyncio.run(make_requests_with_limits(["https://www.example.com"] * 10))
fetch_page
теперь принимает httpx.AsyncClient
в качестве аргумента, что позволяет использовать один клиент для всех запросов внутри main
. Это способствует более эффективному использованию соединений. Добавлен пример с некорректным URL и API-запросом для демонстрации обработки ошибок и различных типов ответов. Также показано, как использовать HTTP/2 с помощью httpx.AsyncClient(http2=True)
. Дополнительно, представлен пример make_requests_with_limits
с использованием asyncio.Semaphore
для ограничения количества одновременных запросов, что позволяет контролировать нагрузку на сервер и предотвращать ошибки, связанные с превышением лимитов.22 Dec, 15:27
Path
.from pathlib import Path
# Создание объекта Path
file_path = Path("my_file.txt")
directory_path = Path("my_directory")
# Проверка существования файла/директории
if file_path.exists():
print(f"Файл {file_path} существует")
else:
print(f"Файл {file_path} не существует")
if directory_path.exists():
print(f"Директория {directory_path} существует")
if directory_path.is_dir():
print(f"Это директория")
# Создание директории
directory_path.mkdir(parents=True, exist_ok=True) # parents=True создает все родительские директории при необходимости, exist_ok=True предотвращает ошибку, если директория уже существует
# Создание файла и запись в него
file_path.write_text("Hello, pathlib!")
# Чтение из файла
file_content = file_path.read_text()
print(f"Содержимое файла: {file_content}")
# Переименование файла
new_file_path = Path("my_new_file.txt")
file_path.rename(new_file_path)
# Удаление файла
new_file_path.unlink()
# Перебор файлов в директории
for file in directory_path.iterdir():
print(file)
# Получение абсолютного пути
absolute_path = directory_path.resolve()
print(f"Абсолютный путь: {absolute_path}")
# Создание вложенных директорий и файла
nested_dir = Path("my_directory/nested/deeper")
nested_dir.mkdir(parents=True, exist_ok=True)
nested_file = nested_dir / "nested_file.txt" # / используется для объединения путей
nested_file.write_text("Content in nested file")
import shutil
# Удаление директории и ее содержимого
shutil.rmtree(directory_path) # rmtree из shutil используется для удаления непустых директорий
Path
, проверяется существование файлов и директорий, создаются и удаляются файлы и директории, производится чтение и запись данных, переименование файлов, перебор файлов в директории и получение абсолютного пути. Также показан пример создания вложенных директорий и файла внутри них с помощью оператора /
. И наконец, показано, как удалить директорию и ее содержимое с помощью shutil.rmtree
.21 Dec, 12:39
# Создать новое окружение с Python 3.9
conda create -n my_env python=3.9
# Активировать окружение
conda activate my_env
# Установить пакеты в активное окружение
conda install numpy pandas scipy
# Просмотреть список установленных пакетов
conda list
# Деактивировать окружение
conda deactivate
# Удалить окружение
conda remove -n my_env --all
# Создать окружение из файла environment.yml
conda env create -f environment.yml
# Экспортировать окружение в файл environment.yml
conda env export > environment.yml
# Обновить все пакеты в активном окружении
conda update --all
# Поиск пакета
conda search numpy
environment.yml
, экспортировать текущее окружение в файл environment.yml
, обновлять все пакеты в окружении и искать нужный пакет. Использование виртуальных окружений с conda — лучшая практика для управления зависимостями проектов и предотвращения конфликтов.20 Dec, 14:24
def test_sum():
assert sum([1, 2, 3]) == 6
assert sum([-1, 1]) == 0
from hypothesis import given
import hypothesis.strategies as st
@given(st.lists(st.integers()))
def test_sum_properties(numbers):
assert sum(numbers) == sum(reversed(numbers))
assert sum(numbers + [0]) == sum(numbers)
19 Dec, 19:36
import time
def compute_intensive_task(n):
result = 0
for i in range(n):
result += i * i
return result
start_time = time.time()
result = compute_intensive_task(10000000) # CPU-bound задача
end_time = time.time()
print(f"Результат: {result}")
print(f"Время выполнения: {end_time - start_time} секунд")
# Для запуска с pypy3: pypy3 example.py
19 Dec, 14:27
import numpy as np
# Создание массивов:
a = np.array([1, 2, 3]) # Одномерный массив
b = np.array([[1, 2, 3], [4, 5, 6]]) # Двумерный массив
c = np.zeros((2, 3)) # Массив из нулей
d = np.ones((3, 2)) # Массив из единиц
e = np.arange(10) # Массив с числами от 0 до 9
f = np.linspace(0, 1, 5) # Массив из 5 чисел, равномерно распределенных от 0 до 1
g = np.random.rand(2, 2) # Массив случайных чисел от 0 до 1
print("a:", a)
print("b:", b)
print("c:", c)
print("d:", d)
print("e:", e)
print("f:", f)
print("g:", g)
# Математические операции:
print("a + 2:", a + 2)
print("a * 3:", a * 3)
print("a.sum():", a.sum()) # Сумма элементов массива
print("b.mean():", b.mean()) # Среднее значение элементов массива
print("a.dot(a): ", a.dot(a)) # Скалярное произведение векторов
# Индексация и срезы:
print("a[0]:", a[0])
print("b[1, 2]:", b[1, 2])
print("e[1:4]:", e[1:4])
# Изменение формы массива:
print("b.reshape(3, 2):", b.reshape(3, 2))
print("a.T:", a.T) # Транспонирование массива
# Линейная алгебра:
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
print("A.dot(B):", A.dot(B)) # Умножение матриц
print("np.linalg.inv(A):", np.linalg.inv(A)) # Обратная матрица
18 Dec, 18:23
os
и упрощает выполнение распространенных операций, таких как копирование, перемещение, удаление, архивирование и работа с файловыми путями.import shutil
import os
# Копирование файла:
shutil.copy("source_file.txt", "destination_folder/") # Копирование в директорию
shutil.copy("source_file.txt", "destination_file.txt") # Копирование с новым именем
# Копирование директории (рекурсивно):
shutil.copytree("source_folder", "destination_folder")
# Перемещение файла или директории:
shutil.move("source_file.txt", "destination_folder/")
shutil.move("source_folder", "destination_folder")
# Удаление файла:
os.remove("file_to_delete.txt")
# Удаление директории (рекурсивно):
shutil.rmtree("directory_to_delete")
# Создание архива:
shutil.make_archive("archive_name", "zip", "folder_to_archive") # ZIP архив
shutil.make_archive("archive_name", "gztar", "folder_to_archive") # tar.gz архив
shutil.make_archive("archive_name", "bztar", "folder_to_archive") # tar.bz2 архив
shutil.make_archive("archive_name", "xztar", "folder_to_archive") # tar.xz архив
# Работа с файловыми путями (некоторые примеры):
print(shutil.which("python")) # Путь к исполняемому файлу python
# Проверка дискового пространства
total, used, free = shutil.disk_usage("/") # Получение информации о дисковом пространстве (корневой раздел)
print(f"Total: {total // (2**30)} GiB")
print(f"Used: {used // (2**30)} GiB")
print(f"Free: {free // (2**30)} GiB")
shutil
для копирования файлов и директорий, перемещения, удаления, создания архивов различных форматов, а также примеры работы с файловыми путями и проверки дискового пространства. Обратите внимание, что операции с файлами и директориями могут привести к безвозвратной потере данных, поэтому будьте осторожны при их использовании. Перед выполнением операций на реальных данных рекомендуется протестировать код на тестовых файлах и директориях. Также показан пример использования shutil.which
для определения пути к исполняемому файлу и shutil.disk_usage
для получения информации о дисковом пространстве.18 Dec, 14:00
import click
@click.group()
def cli():
"""Утилита для работы с файлами"""
pass
@cli.command()
@click.argument('path')
@click.option('--lines', '-l', is_flag=True, help='Показать количество строк')
def analyze(path, lines):
"""Анализирует файл и выводит статистику"""
try:
with open(path) as f:
content = f.readlines()
if lines:
click.echo(f'Количество строк: {len(content)}')
except FileNotFoundError:
click.secho('Файл не найден! 😱', fg='red')
if __name__ == '__main__':
cli()
@cli.command()
@click.argument('path')
def process(path):
"""Обрабатывает файлы с прогресс-баром"""
files = os.listdir(path)
with click.progressbar(files, label='Обработка файлов') as bar:
for f in bar:
# что-то делаем с файлом
time.sleep(0.1)
click.secho('Внимание! 🚨', fg='yellow', bold=True)
click.secho('Ошибка! ❌', fg='red')
click.secho('Успех! ✅', fg='green')
if click.confirm('Уверены, что хотите удалить все файлы? 🤔'):
click.echo('Поехали! 🚀')
17 Dec, 15:38
concurrent.futures
особенно полезно при работе с I/O-bound задачами, такими как сетевые запросы, чтение/запись файлов, взаимодействие с базами данных. Он также может быть эффективен для CPU-bound задач, если они могут быть разделены на независимые подзадачи.concurrent.futures
с ThreadPoolExecutor
:import concurrent.futures
import time
import requests
URLS = [
"https://www.example.com",
"https://www.google.com",
"https://www.python.org",
"https://www.wikipedia.org",
]
def load_url(url, timeout):
try:
response = requests.get(url, timeout=timeout)
return f"{url}: {len(response.content)} bytes"
except requests.exceptions.RequestException as e:
return f"{url}: Error: {e}"
start_time = time.time()
# Последовательное выполнение:
for url in URLS:
print(load_url(url, timeout=60))
end_time = time.time()
print(f"Последовательное выполнение заняло: {end_time - start_time} секунд")
start_time = time.time()
# Асинхронное выполнение с ThreadPoolExecutor:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# map() возвращает результаты в порядке URLS
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print(f"{url}: generated an exception: {exc}")
else:
print(data)
end_time = time.time()
print(f"Асинхронное выполнение заняло: {end_time - start_time} секунд")
ThreadPoolExecutor
. Функция load_url
загружает данные по указанному URL. ThreadPoolExecutor
создает пул потоков, которые выполняют загрузку URL-адресов параллельно. as_completed
позволяет обрабатывать результаты по мере их готовности, не дожидаясь завершения всех задач. Сравнение времени выполнения показывает преимущество асинхронного подхода, особенно при работе с сетевыми запросами.16 Dec, 19:29
difflib
разнообразно: от сравнения версий файлов и поиска плагиата до создания патчей и слияния изменений в системах контроля версий. Он также может быть полезен для анализа текстовых данных, например, для выявления изменений в документах или сравнения ответов пользователей.difflib
для сравнения двух строк:import difflib
text1 = """
This is the first line.
This is the second line.
This is the third line.
"""
text2 = """
This is the first line.
This is the modified second line.
This is the fourth line.
"""
diff = difflib.ndiff(text1.splitlines(), text2.splitlines())
print("\n".join(diff))
# Вывод:
# This is the first line.
# - This is the second line.
# + This is the modified second line.
# ? +++++++++
#
# - This is the third line.
# ? ---
#
# + This is the fourth line.
# ? ++++
# ^
# Более наглядное сравнение:
diff = difflib.unified_diff(text1.splitlines(), text2.splitlines(), fromfile='file1.txt', tofile='file2.txt')
print("\n".join(diff))
# Вывод:
# --- file1.txt
# +++ file2.txt
# @@ -1,4 +1,4 @@
#
# This is the first line.
# -This is the second line.
# +This is the modified second line.
# -This is the third line.
# +This is the fourth line.
# Сравнение HTML:
from difflib import HtmlDiff
html_diff = HtmlDiff()
table = html_diff.make_table(text1.splitlines(), text2.splitlines(), fromdesc='file1.txt', todesc='file2.txt')
with open('diff.html', 'w') as f:
f.write(table) # Сохранение результата в HTML файл
ndiff
и unified_diff
для сравнения двух строк построчно. ndiff
показывает различия с помощью специальных символов: -
для удаленных строк, +
для добавленных и ?
для изменений внутри строки. unified_diff
предоставляет более компактный вывод, похожий на формат патчей. Также показан пример использования HtmlDiff
для создания HTML-таблицы с различиями, что удобно для визуального сравнения.16 Dec, 14:23
from fastapi import FastAPI, Query, Path, HTTPException
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI(title="My Awesome API", version="1.0.0", description="Example FastAPI application")
class Item(BaseModel):
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
@app.get("/") # GET-запрос на корневой путь
async def root():
return {"message": "Hello World"}
@app.get("/items/{item_id}") # GET-запрос с параметром пути
async def read_item(item_id: int = Path(..., title="The ID of the item to get", ge=1)): # Валидация параметра пути
return {"item_id": item_id}
@app.get("/items/")
async def read_items(q: Optional[str] = Query(None, min_length=3, max_length=50)): # Валидация query-параметра
results = {"items": [{"item_id": "foo"}, {"item_id": "bar"}]}
if q:
results.update({"q": q})
return results
@app.post("/items/", response_model=Item) # POST-запрос с валидацией тела запроса
async def create_item(item: Item):
return item
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
return {"item_id": item_id, **item.dict()}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Пример обработки ошибок
if user_id == 404:
raise HTTPException(status_code=404, detail="User not found")
return {"user_id": user_id}
# Запуск приложения
uvicorn main:app --reload
HTTPException
. Для запуска приложения используйте команду uvicorn main:app --reload
, где main
— имя файла, а app
— имя экземпляра FastAPI. После запуска перейдите по адресу /docs
в браузере, чтобы увидеть автоматически сгенерированную интерактивную документацию.15 Dec, 14:11
@dataclass
, который автоматически генерирует шаблонный код для классов, преимущественно предназначенных для хранения данных. Это упрощает создание классов с атрибутами, методами __init__
, __repr__
, __eq__
и другими, без необходимости писать их вручную.from dataclasses import dataclass, field
@dataclass(order=True) # order=True добавляет методы сравнения
class Person:
name: str
age: int
city: str = "Unknown" # Значение по умолчанию
email: str = field(default="[email protected]", repr=False) # repr=False скрывает поле в __repr__
scores: list[int] = field(default_factory=list) # mutable default value
person1 = Person("Alice", 30, "New York", scores=[100, 90])
person2 = Person("Bob", 25, "Los Angeles")
person3 = Person("Charlie", 35, scores=[80, 85])
person4 = Person("Alice", 30, "New York", scores=[100, 90])
print(person1) # Вывод: Person(name='Alice', age=30, city='New York', scores=[100, 90])
print(person2) # Вывод: Person(name='Bob', age=25, city='Los Angeles', scores=[])
print(person3) # Вывод: Person(name='Charlie', age=35, city='Unknown', scores=[80, 85])
print(person1 == person4) # True
print(person1 > person2) # True (сравнение по name, затем age)
# Изменение атрибутов:
person1.city = "San Francisco"
person1.scores.append(95)
print(person1) # Вывод: Person(name='Alice', age=30, city='San Francisco', scores=[100, 90, 95])
# Post-init processing
@dataclass
class InventoryItem:
"""Class for keeping track of an item in inventory."""
name: str
unit_price: float
quantity_on_hand: int = 0
def __post_init__(self):
self.total_cost = self.unit_price * self.quantity_on_hand
item = InventoryItem("Pen", 1.5, 50)
print(item.total_cost) # 75.0
Person
с различными типами полей, включая значение по умолчанию, field
для скрытия поля в __repr__
и default_factory
для изменяемых значений по умолчанию. Демонстрируется вывод информации о dataclass с помощью print
, сравнение объектов dataclass и изменение атрибутов. Также приведен пример использования __post_init__
для выполнения действий после инициализации dataclass, как, например, вычисление total_cost
в InventoryItem
. order=True
позволяет сравнивать экземпляры dataclass.14 Dec, 13:19
import ctypes
import time
# Загрузка библиотеки C (libcallback_example.so - пример, нужно скомпилировать код C)
lib = ctypes.CDLL("./libcallback_example.so")
# Определение типа функции обратного вызова
CMPFUNC = ctypes.CFUNCTYPE(None, ctypes.c_int)
# Функция Python, которая будет использоваться в качестве обратного вызова
def my_callback(value):
print(f"Python callback called with value: {value}")
# Регистрация функции обратного вызова в библиотеке C
# Предполагается, что в библиотеке C есть функция register_callback, принимающая указатель на функцию
register_callback = lib.register_callback
register_callback.argtypes = [CMPFUNC]
register_callback.restype = None
register_callback(CMPFUNC(my_callback))
# Вызов функции C, которая будет периодически вызывать функцию обратного вызова
# Предполагается, что в библиотеке C есть функция run_loop, которая запускает цикл
run_loop = lib.run_loop
run_loop.argtypes = []
run_loop.restype = None
run_loop()
#include <stdio.h>
#include <unistd.h> // for sleep
typedef void (*callback_function)(int);
static callback_function registered_callback = NULL;
void register_callback(callback_function callback) {
registered_callback = callback;
}
void run_loop() {
int i = 0;
while (i < 5) {
sleep(1);
if (registered_callback != NULL) {
registered_callback(i);
}
i++;
}
}
gcc -shared -o libcallback_example.so -fPIC callback_example.c
ctypes
для вызова функций из динамически подключаемой библиотеки C и передачи функции Python в качестве обратного вызова. Код на C регистрирует коллбек и периодически вызывает его. Важно правильно определить типы данных с помощью ctypes.CFUNCTYPE
и argtypes/restype
. Перед запуском Python кода необходимо скомпилировать код C в shared library (.so
файл для Linux).13 Dec, 18:27
from bs4 import BeautifulSoup
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
<title>Example Page</title>
<meta name="description" content="Example description">
</head>
<body>
<h1>This is a heading</h1>
<p>This is some text in a paragraph.</p>
<a href="https://www.example.com">This is a link</a>
<ul>
<li>Item 1</li>
<li>Item 2</li>
</ul>
<div class="my-class">This is a div</div>
<p id="my-id">This is a paragraph with id </p>
</body>
</html>
"""
soup = BeautifulSoup(html_content, "html.parser") # "html.parser" - используемый парсер
# Навигация и поиск элементов:
title = soup.title.string # Получение текста заголовка
print("Title:", title)
description = soup.find("meta", attrs={"name": "description"})["content"]
print("Description:", description)
links = soup.find_all("a")
for link in links:
print("Link:", link["href"])
paragraphs = soup.find_all("p")
for p in paragraphs:
print("Paragraph:", p.text)
list_items = soup.find("ul").find_all("li")
for item in list_items:
print("List item:", item.text)
div_with_class = soup.find("div", class_="my-class")
print("Div with class:", div_with_class.text)
paragraph_with_id = soup.find("p", id="my-id")
print("Paragraph with id:", paragraph_with_id.text)
# Модификация HTML:
new_paragraph = soup.new_tag("p")
new_paragraph.string = "This is a new paragraph."
soup.body.append(new_paragraph)
print(soup.prettify()) # Вывод измененного HTML с отступами
BeautifulSoup
из HTML-строки, получение заголовка, поиск элементов по тегам, атрибутам и классам, извлечение текста и атрибутов элементов, а также добавление нового элемента в HTML-структуру. Используется парсер "html.parser"
, который встроен в Python. Для более сложных случаев можно использовать другие парсеры, такие как lxml
или html5lib
. Вывод soup.prettify()
позволяет увидеть форматированный HTML-код после внесенных изменений.13 Dec, 14:25
print(bool(-1)) # Результат: True
06 Dec, 14:08
os.rename(old_name, new_name)
import os
# Переименуем файл 'старый_файл.txt' в 'новый_файл.txt'
old_name = 'старый_файл.txt'
new_name = 'новый_файл.txt'
# Проверяем, существует ли старый файл
if os.path.exists(old_name):
os.rename(old_name, new_name)
print(f'Файл переименован в {new_name}')
else:
print(f'Файл {old_name} не найден')
new_name
уже существует, функция вызовет ошибку. 06 Dec, 07:02
06 Dec, 07:02
05 Dec, 15:04
import humanize
import datetime
# Пример: время, прошедшее с 24 сентября 2023 года
dt = datetime.datetime(2023, 9, 24)
print(humanize.naturaltime(dt)) # Вывод: "1 месяц назад" (на момент текущей даты)
import humanize
number = 1500
print(humanize.intcomma(number)) # Вывод: "1,500"
print(humanize.intword(number)) # Вывод: "1.5 k"
import humanize
duration = 3661 # время в секундах
print(humanize.precisedelta(datetime.timedelta(seconds=duration)))
# Вывод: "1 час, 1 минутa, 1 секунда"
04 Dec, 14:44
a = 3
b = 10
print('a больше b') if a > b else print('a меньше b')
result = 'Четное' if a % 2 == 0 else 'Нечетное'
print(result)
# Результат:
# a меньше b
# Нечетное
a > b
.result
одного из двух значений в зависимости от четности a
.03 Dec, 14:41
# Старый подход с os.path
import os.path
file_path = os.path.join('data', 'users', 'config.json')
parent_dir = os.path.dirname(file_path)
file_name = os.path.basename(file_path)
# Новый подход с pathlib
from pathlib import Path
file_path = Path('data') / 'users' / 'config.json'
parent_dir = file_path.parent
file_name = file_path.name
path = Path('config.json')
if path.exists():
print('Файл существует!')
Path('nested/directories/structure').mkdir(parents=True, exist_ok=True)
# Найти все .py файлы в текущей директории
python_files = list(Path('.').glob('*.py'))
path = Path('document.pdf')
print(path.suffix) # .pdf
print(path.stem) # document
config_path = (Path.home() / 'projects' / 'app' / 'config.json')
if config_path.exists():
data = json.loads(config_path.read_text())
with Path('log.txt').open('w') as f:
f.write('Logging started')
02 Dec, 15:04
def create_power_function(power):
def power_func(x):
return x ** power
return power_func
# Создаём функции на лету
square = create_power_function(2)
cube = create_power_function(3)
import types
def create_dynamic_function(func_name, args, body):
namespace = {}
func_code = f"def {func_name}({', '.join(args)}):\n{body}"
exec(func_code, globals(), namespace)
return namespace[func_name]
# Магия в действии
greet = create_dynamic_function(
"greet",
["name"],
" return f'Привет, {name}!'"
)
01 Dec, 14:23
pass
, чтобы указать на то, что здесь будет код. Это позволяет сохранить структуру программы и избежать ошибок синтаксиса.pass
может быть полезен при создании минимальных классов или функций. В Python, класс или функция не может быть пустой. Если вы попытаетесь создать пустой класс или функцию, интерпретатор вернет ошибку. Оператор pass
позволяет обойти это ограничение.pass
также может быть использован для управления потоком программы. Иногда в условной конструкции if/elif/else
или в цикле for/while
может не быть необходимости выполнять какое-либо действие. В этих случаях можно использовать оператор pass
для обозначения пустого блока.01 Dec, 06:06
30 Nov, 13:45
def main():
# Точка входа в программу
if __name__ == '__main__':
main()
29 Nov, 14:12
# Без slots
class User:
def __init__(self, name, age):
self.name = name
self.age = age
# Со slots
class OptimizedUser:
__slots__ = ['name', 'age']
def __init__(self, name, age):
self.name = name
self.age = age
class DataPoint:
__slots__ = ['timestamp', 'value', 'sensor_id']
def __init__(self, timestamp, value, sensor_id):
self.timestamp = timestamp
self.value = value
self.sensor_id = sensor_id
# Представьте, что таких объектов у вас миллионы
data_points = [DataPoint(time.time(), random.random(), i) for i in range(1_000_000)]
28 Nov, 13:06
raise
без каких-либо параметров в блоке except
, чтобы повторно вызвать текущее исключение. raise
.def divide(a, b):
if b == 0:
raise ValueError("Деление на ноль невозможно.")
return a / b
try:
result = divide(10, 0)
except ValueError as e:
print(f"Ошибка: {e}")
divide
вызывает исключение ValueError
, если второй аргумент равен нулю. В блоке try
мы пытаемся выполнить деление, и если возникает ошибка, мы перехватываем её и выводим сообщение об ошибке.27 Nov, 15:06
import whylogs as why
import pandas as pd
# Создание примера данных
data = {
"age": [25, 30, 35, 40, 45],
"income": [50000, 60000, 70000, 80000, 90000]
}
df = pd.DataFrame(data)
# Инициализация логгера
logger = why.logger()
# Логируем данные
logger.log_dataframe(df)
# Генерируем отчет
report = logger.report()
# Сохраняем отчет
report.save("report.whylog")
# Выводим на экран
print(report)
age
и income
. Затем мы инициализируем Whylogs логгер, логируем наш DataFrame, генерируем и сохраняем отчет.26 Nov, 13:53
class Bird:
def fly(self):
return "I can fly!"
class Duck(Bird):
def quack(self):
return "Quack!"
class Airplane:
def fly(self):
return "I can also fly!"
def make_it_fly(flyable_thing):
print(flyable_thing.fly())
# Создаем объекты
duck = Duck()
airplane = Airplane()
# Используем их, не смотря на разные типы
make_it_fly(duck) # Выведет: I can fly!
make_it_fly(airplane) # Выведет: I can also fly!
make_it_fly
принимает любой объект, который имеет метод fly()
. Если объект соответствует этому интерфейсу, он будет выполнен, независимо от того, является ли объект уткой, самолетом или чем-то еще.25 Nov, 15:35
24 Nov, 12:31
import ast
code = """
def calculate_sum(a, b):
result = a + b
print(f"Sum: {result}")
return result
"""
# Создаем AST
tree = ast.parse(code)
# Анализируем структуру
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
print(f"Найдена функция: {node.name}")
elif isinstance(node, ast.Name):
print(f"Найдена переменная: {node.id}")
class DebugTransformer(ast.NodeTransformer):
def visit_FunctionDef(self, node):
# Добавляем отладочный print в начало каждой функции
debug_print = ast.Expr(
value=ast.Call(
func=ast.Name(id='print', ctx=ast.Load()),
args=[ast.Constant(value=f"Вызов функции {node.name}")],
keywords=[]
)
)
node.body.insert(0, debug_print)
return node
# Применяем трансформацию
transformed = DebugTransformer().visit(tree)
class StringOptimizer(ast.NodeTransformer):
def visit_BinOp(self, node):
# Оптимизация конкатенации строк
if isinstance(node.op, ast.Add):
if isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant):
if isinstance(node.left.value, str) and isinstance(node.right.value, str):
return ast.Constant(value=node.left.value + node.right.value)
return node
class ComplexityAnalyzer(ast.NodeVisitor):
def __init__(self):
self.complexity = 0
def visit_If(self, node):
self.complexity += 1
self.generic_visit(node)
def visit_For(self, node):
self.complexity += 2
self.generic_visit(node)
def visit_While(self, node):
self.complexity += 2
self.generic_visit(node)
# Использование
analyzer = ComplexityAnalyzer()
analyzer.visit(tree)
print(f"Сложность кода: {analyzer.complexity}")
23 Nov, 08:22
import jmespath
jmespath.search('foo.bar', {'foo': {'bar': 'baz'}})
# output: 'baz'
jmespath.search('foo.*.name', {'foo': {'bar': {'name': 'one'}, 'baz':
{'name': 'two'}}})
# output: ['one', 'two']
22 Nov, 07:00
21 Nov, 07:45
# Сборка образа
docker build -t my-image .
# Запуск контейнера
docker run -d --name my-container my-image
# Просмотр запущенных контейнеров
docker ps
# Остановка контейнера
docker stop my-container
version: '3'
services:
web:
build: .
ports:
- "5000:5000"
redis:
image: "redis:alpine"
# Создание volume
docker volume create my-vol
# Использование volume при запуске контейнера
docker run -v my-vol:/app/data my-image
20 Nov, 17:53
from unittest.mock import patch
import asyncio
async def fetch_data(url):
# Реальный запрос к API
...
@patch('your_module.fetch_data')
async def test_process_data(mock_fetch):
mock_fetch.return_value = {'key': 'value'}
result = await process_data('http://api.example.com')
assert result == 'processed value'
import asyncio
async def test_order_of_execution():
event = asyncio.Event()
results = []
async def task1():
await event.wait()
results.append(1)
async def task2():
results.append(2)
event.set()
await asyncio.gather(task1(), task2())
assert results == [2, 1]
import pytest
import asyncio
@pytest.mark.asyncio
async def test_long_running_task():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(never_ending_task(), timeout=1.0)
import pytest
import asyncio
@pytest.fixture
async def database():
db = await create_database_connection()
yield db
await db.close()
@pytest.mark.asyncio
async def test_database_query(database):
result = await database.fetch('SELECT * FROM users')
assert len(result) > 0
pytest -n auto your_test_file.py
19 Nov, 15:13
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
loop.set_default_executor(concurrent.futures.ThreadPoolExecutor(max_workers=10))
sem = asyncio.Semaphore(10)
async def controlled_task(i):
async with sem:
# Ваш асинхронный код здесь
await asyncio.sleep(1)
print(f"Задача {i} выполнена!")
class AsyncContextManager:
async def __aenter__(self):
print("Entering the matrix...")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
print("Exiting the matrix...")
await asyncio.sleep(1)
async def main():
async with AsyncContextManager() as manager:
print("We're in!")
async def async_range(start, stop):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for num in async_range(0, 10):
print(num)
18 Nov, 10:34
16 Nov, 14:19
15 Nov, 11:07
MyClass = type('MyClass', (), {'x': 42, 'my_method': lambda self: print("Hello!")})
obj = MyClass()
print(obj.x) # Выведет: 42
obj.my_method() # Выведет: Hello!
class MyMetaclass(type):
def __new__(cls, name, bases, attrs):
attrs['additional_method'] = lambda self: print("I'm additional!")
return super().__new__(cls, name, bases, attrs)
class MyClass(metaclass=MyMetaclass):
pass
obj = MyClass()
obj.additional_method() # Выведет: I'm additional!
14 Nov, 16:26
from functools import cache
@cache
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
print(fibonacci(100)) # Мгновенный результат даже для больших чисел
from functools import wraps
def my_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
"""Это документация обертки"""
print("До вызова функции")
result = func(*args, **kwargs)
print("После вызова функции")
return result
return wrapper
@my_decorator
def say_hello(name):
"""Приветствует пользователя по имени"""
print(f"Привет, {name}!")
say_hello("Мария")
print(say_hello.__name__) # Выведет: say_hello
print(say_hello.__doc__) # Выведет: Приветствует пользователя по имени
14 Nov, 09:16
from functools import reduce
numbers = [1, 2, 3, 4, 5]
product = reduce(lambda x, y: x * y, numbers)
print(product) # Выведет: 120
from functools import singledispatch
@singledispatch
def process(arg):
print(f"Обработка объекта: {arg}")
@process.register(int)
def _(arg):
print(f"Обработка целого числа: {arg}")
@process.register(list)
def _(arg):
print(f"Обработка списка длиной {len(arg)}")
process("строка") # Обработка объекта: строка
process(42) # Обработка целого числа: 42
process([1, 2, 3]) # Обработка списка длиной 3
from functools import total_ordering
@total_ordering
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def __eq__(self, other):
return self.age == other.age
def __lt__(self, other):
return self.age < other.age
p1 = Person("Алиса", 25)
p2 = Person("Боб", 30)
print(p1 < p2) # True
print(p1 <= p2) # True
print(p1 > p2) # False
print(p1 >= p2) # False
13 Nov, 10:32
12 Nov, 08:51
# Установка библиотеки
!pip install fugue
# Импорт необходимых модулей
from fugue import FugueEngine
from fugue.spark import SparkExecutionEngine
# Пример обработки данных
def process_data(df):
return df.groupby("category").agg({"value": "sum"})
# Инициализация движка
engine = SparkExecutionEngine()
# Чтение данных
data = [
{"category": "A", "value": 10},
{"category": "B", "value": 20},
{"category": "A", "value": 30},
]
# Обработка данных
result = engine.run(data, process_data)
# Вывод результата
print(result)
11 Nov, 12:31
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lineapy import LineaPy
# Создаем синтетические данные
np.random.seed(42)
X = np.random.rand(100, 1) * 10 # 100 случайных точек в диапазоне от 0 до 10
y = 2.5 * X + np.random.randn(100, 1) * 2 # Линейная зависимость с шумом
# Преобразуем данные в DataFrame
data = pd.DataFrame(np.hstack((X, y)), columns=['X', 'y'])
# Создаем модель с помощью LineaPy
model = LineaPy()
model.fit(data['X'], data['y'])
# Предсказания
predictions = model.predict(data['X'])
# Визуализация
plt.scatter(data['X'], data['y'], color='blue', label='Данные')
plt.plot(data['X'], predictions, color='red', label='Предсказание')
plt.xlabel('X')
plt.ylabel('y')
plt.title('Линейная Регрессия с LineaPy')
plt.legend()
plt.show()
10 Nov, 09:42
from memory_profiler import profile
@profile
def my_func():
a = [1] * (10 ** 6)
b = [2] * (2 * 10 ** 7)
del b
return a
if __name__ == '__main__':
my_func()
python -m memory_profiler script.py
, и вы увидите подробный отчет о использовании памяти. Красота, правда?@profile
def my_func():
a = [1] * (10 ** 6)
b = [2] * (2 * 10 ** 7)
del b
return a
my_func()
kernprof -l -v script.py
, и вы увидите, какая строчка сколько памяти съедает.import objgraph
x = []
y = [x, [x], dict(x=x)]
objgraph.show_refs([y], filename='sample-graph.png')
import tracemalloc
tracemalloc.start()
# ваш код здесь
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Топ 10 ]")
for stat in top_stats[:10]:
print(stat)
09 Nov, 11:50
import functools
def spy_args(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
print(f"Вызов {func.__name__} с аргументами: {args}, {kwargs}")
return func(*args, **kwargs)
return wrapper
@spy_args
def секретная_функция(x, y, шифр="007"):
return x + y
результат = секретная_функция(3, 4, шифр="008")
def to_json(func):
import json
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
return json.dumps(result)
return wrapper
@to_json
def получить_данные():
return {"имя": "Алиса", "возраст": 30}
json_data = получить_данные()
def применить_все(*funcs):
def декоратор(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
result = f(*args, **kwargs)
for func in funcs:
result = func(result)
return result
return wrapper
return декоратор
def удвоить(x): return x * 2
def прибавить_один(x): return x + 1
@применить_все(удвоить, прибавить_один)
def базовая_функция(x):
return x
результат = базовая_функция(10) # Вернёт 21
class Ленивый:
def __init__(self, function):
self.function = function
self.результат = None
def __call__(self, *args, **kwargs):
if self.результат is None:
self.результат = self.function(*args, **kwargs)
return self.результат
@Ленивый
def сложные_вычисления():
print("Выполняю сложные вычисления...")
return 42
результат = сложные_вычисления() # Вычисления выполняются
результат = сложные_вычисления() # Используется кэшированный результат
08 Nov, 14:18
async def async_range(start, stop):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for num in async_range(0, 5):
print(num)
asyncio.run(main())
07 Nov, 14:36
from langchain import LLMChain
from langchain.llms import OpenAI
# Инициализация языковой модели
llm = OpenAI(api_key="YOUR_API_KEY")
# Определение шаблона для вопроса
template = "Какой ответ на вопрос: {question}?"
# Создание цепочки с использованием шаблона
chain = LLMChain(llm=llm, prompt=template)
# Ввод вопроса
question = "Что такое LangChain?"
# Получение ответа
response = chain.run({"question": question})
print(response)
07 Nov, 07:38
pip install pr-agent
from pr_agent import cli
from pr_agent.config_loader import get_settings
def main():
provider = "github"
user_token = "..."
openai_key = "..."
pr_url = "..."
command = "/review"
get_settings().set("CONFIG.git_provider", provider)
get_settings().set("openai.key", openai_key)
get_settings().set("github.user_token", user_token)
cli.run_command(pr_url, command)
if __name__ == '__main__':
main()
docker pull docker.all-hands.dev/all-hands-ai/runtime:0.12-nikolaik
docker run -it --rm --pull=always \
-e SANDBOX_RUNTIME_CONTAINER_IMAGE=docker.all-hands.dev/all-hands-ai/runtime:0.12-nikolaik \
-v /var/run/docker.sock:/var/run/docker.sock \
-p 3000:3000 \
--add-host host.docker.internal:host-gateway \
--name openhands-app \
docker.all-hands.dev/all-hands-ai/openhands:0.12
pip install vanna
from vanna.openai.openai_chat import OpenAI_Chat
from vanna.chromadb.chromadb_vector import ChromaDB_VectorStore
class MyVanna(ChromaDB_VectorStore, OpenAI_Chat):
def __init__(self, config=None):
ChromaDB_VectorStore.__init__(self, config=config)
OpenAI_Chat.__init__(self, config=config)
vn = MyVanna(config={'api_key': 'sk-...', 'model': 'gpt-4-...'})
# Обучение модели
vn.train(ddl="""
CREATE TABLE IF NOT EXISTS my-table (
id INT PRIMARY KEY,
name VARCHAR(100),
age INT
)
""")
# Задать вопрос
sql_query = vn.ask("What are the top 10 customers by sales?")
print(sql_query)
06 Nov, 09:00
pip install compsio-core swekit
pip install crewai composio-crewai
composio add github
swekit scaffold crewai -o swe_agent
cd swe_agent/agent
python main.py
pip install aider-chat
cd /to/your/git/repo
export ANTHROPIC_API_KEY=your-key-goes-here
aider
# Или для работы с GPT-4
export OPENAI_API_KEY=your-key-goes-here
aider
python3 -m venv .venv
source .venv/bin/activate
git clone https://github.com/AbanteAI/mentat.git
cd mentat
pip install -e .
export OPENAI_API_KEY=<your key here>
mentat <paths to files or directories>
export OPENAI_KEY=sk-YOUR-OPENAI-API-KEY-HERE
docker build -f Dockerfile -t acr .
docker run -it -e OPENAI_KEY="${OPENAI_KEY:-OPENAI_API_KEY}" -p 3000:3000 -p 5000:5000 acr
05 Nov, 09:08
lambda
, в отличие от обычных функций, которые определяются с помощью def
. Они позволяют писать более чистый и читаемый код, устраняя необходимость во временных определениях функций.lambda arguments: expression
add = lambda x, y: x + y
result = add(3, 5)
print(result) # Выведет: 8
map()
применяет функцию к каждому элементу итерируемого объекта:numbers = [1, 2, 3, 4]
squared = list(map(lambda x: x ** 2, numbers))
print(squared) # Выведет: [1, 4, 9, 16]
filter()
используется для фильтрации элементов:numbers = [1, 2, 3, 4]
even = list(filter(lambda x: x % 2 == 0, numbers))
print(even) # Выведет: [2, 4]
sorted()
позволяет сортировать элементы по заданному критерию:points = [(1, 2), (3, 1), (5, -1)]
points_sorted = sorted(points, key=lambda point: point[1])
print(points_sorted) # Выведет: [(5, -1), (3, 1), (1, 2)]
nested_lambda = lambda x: (lambda y: y ** 2)(x) + 1
print(nested_lambda(3)) # Выведет: 10
import pandas as pd
data = {'A': [1, 2, 3], 'B': [4, 5, 6]}
df = pd.DataFrame(data)
df['C'] = df.apply(lambda row: row['A'] + row['B'], axis=1)
print(df)
04 Nov, 08:53
def fractional_knapsack(items, capacity):
# Sort items by their value-to-weight ratio in descending order
items.sort(key=lambda x: x[1] / x[0], reverse=True)
total_value = 0
remaining_capacity = capacity
for item in items:
if remaining_capacity >= item[0]:
total_value += item[1]
remaining_capacity -= item[0]
else:
total_value += (remaining_capacity / item[0]) * item[1]
break
return total_value
# Example usage:
items = [(2, 10), (3, 5), (5, 15), (7, 7), (1, 6)]
knapsack_capacity = 10
max_value = fractional_knapsack(items, knapsack_capacity)
print(max_value)
03 Nov, 10:14
import heapq
unsorted_array = [100, 230, 44, 1, 74, 12013, 84]
heapq.heapify(unsorted_array)
print(unsorted_array)
# [1, 74, 44, 230, 100, 12013, 84]
sorted_array = []
for _ in range(len(unsorted_array)):
sorted_array.append(heapq.heappop(unsorted_array))
print(sorted_array)
# [1, 44, 74, 84, 100, 230, 12013]
class MinHeap:
def __init__(self):
self.nodes = []
def add(self, item):
self.nodes.append(item)
self.__heapify_up()
def poll(self):
if self.is_empty():
return None
removed_node = self.nodes[0]
self.nodes[0] = self.nodes[-1]
del self.nodes[-1]
self.__heapify_down()
return removed_node
def peek(self):
return self.nodes[0] if not self.is_empty() else None
def is_empty(self):
return len(self.nodes) == 0
def __heapify_up(self):
# Реализация метода подъема элемента
def __heapify_down(self):
# Реализация метода опускания элемента
02 Nov, 08:56
01 Nov, 17:16
def simple_hash(input_str, table_size):
hash_value = 0
for char in input_str:
hash_value += ord(char)
return hash_value % table_size
def polynomial_hash(input_str, table_size, prime=31):
hash_value = 0
for i, char in enumerate(input_str):
hash_value += ord(char) * (prime ** i)
return hash_value % table_size
def fnv1a_hash(key, table_size):
FNV_prime = 16777619
FNV_offset_basis = 2166136261
hash_value = FNV_offset_basis
for char in key:
hash_value ^= ord(char)
hash_value *= FNV_prime
hash_value &= 0xffffffff # Обеспечивает 32-битный хэш
return hash_value % table_size
def xx_hash(input_str, table_size):
return xxhash.xxh32(input_str).intdigest() % table_size
def sip_hash(input_str, table_size, key=b'secretkey'):
hash_value = hmac.new(key, input_str.encode(), digestmod='sha256').hexdigest()
return int(hash_value, 16) % table_size
def murmur_hash(input_str, table_size):
hash_value = mmh3.hash(input_str) % table_size
return hash_value
31 Oct, 18:14
from manim import *
class SquareNumber(Scene):
def construct(self):
# Создаем квадрат и текст
square = Square(side_length=2)
number = MathTex("0").scale(2)
# Центрируем квадрат и текст
square.move_to(ORIGIN)
number.next_to(square, DOWN)
# Добавляем квадрат и текст на экран
self.play(Create(square), Write(number))
self.wait(1)
# Изменяем текст на 1
self.play(Transform(number, MathTex("1").scale(2)))
self.wait(1)
# Изменяем текст на 4 и меняем размер квадрата
self.play(Transform(number, MathTex("4").scale(2)),
square.animate.scale(2))
self.wait(1)
# Изменяем текст на 9 и меняем размер квадрата
self.play(Transform(number, MathTex("9").scale(2)),
square.animate.scale(3))
self.wait(1)
# Завершаем сцену
self.play(FadeOut(square), FadeOut(number))
# Для запуска сцены используйте следующую команду в терминале
# manim -pql имя_файла.py SquareNumber
square_number.py
, и выполните указанную команду в терминале.31 Oct, 09:22
class IterableCounter:
def __init__(self, start, end):
self.current = start
self.end = end
def __iter__(self):
return self
def __next__(self):
if self.current > self.end:
raise StopIteration
else:
self.current += 1
return self.current - 1
counter = IterableCounter(1, 5)
for num in counter:
print(num)
30 Oct, 10:30
{
'🔥': "5104841245755180586",
'👍': "5107584321108051014",
'👎': "5104858069142078462",
'❤️': "5044134455711629726",
'🎉': "5046509860389126442",
'💩': "5046589136895476101"
}
29 Oct, 09:33
25 Oct, 08:49
body
DOM или случайное место, а именно в указанный вами элемент. Мы можем использовать это для реализации маршрутизации, используя одну точку монтирования для каждой страницы:ReactDOM.render(
<h1>Страница 1!</h1>,
document.getElementById('page-1')
);
ReactDOM.render(
<h1>Страница 2!</h1>,
document.getElementById('page-2')
);
<div>
с соответствующим идентификатором:{% extends "base.html" %}
{% load static %}
{% block content %}
<div id="{{ element_id }}"></div>
<script src="{% static 'index.js' %}"></script>
{% endblock %}
let Page1 = React.lazy(() => import('./page1'))
let Page2 = React.lazy(() => import('./page2'))
ReactDOM.render(
<Suspense fallback={<></>}>
<Page1/>
</Suspense>,
document.getElementById('page-1')
);
json_script
:{% extends "base.html" %}
{% load static %}
{% block content %}
<div id="{{ element_id }}"></div>
{{ page_context | json_script:'page-context' }}
<script src="{% static 'index.js' %}"></script>
{% endblock %}
export let usePageContext = <T = any>() => {
let [pageContext, setPageContext] = useState<T | undefined>(undefined)
useEffect(() => {
let pageContext = document.getElementById('page-context').textContent
setPageContext(JSON.parse(pageContext))
}, [])
return pageContext as T
}
const TodosIndexPage = memo(() => {
let pageContext = usePageContext<{ todos: Todo[] }>()
let todos = pageContext?.todos
return <>
<h1>React todos page</h1>
<ul>
{todos?.map(todo => <li key={todo.id}>{todo.title}</li>)}
</ul>
</>
})
24 Oct, 09:03
23 Oct, 16:50
pip install qrcode Flask
import qrcode
from flask import Flask, request, render_template, send_file
app = Flask(__name__)
@app.route("/", methods=["GET", "POST"])
def generate_qr():
if request.method == "POST":
data = request.form["data"]
qr_img = qrcode.make(data)
qr_path = "static/qr_code.png"
qr_img.save(qr_path)
return send_file(qr_path, mimetype='image/png')
return render_template("index.html")
if __name__ == "__main__":
app.run(debug=True)
<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8">
<title>QR-код генератор</title>
</head>
<body>
<h1>QR-код генератор</h1>
<form method="POST">
<input type="text" name="data" placeholder="Введите текст или URL">
<button type="submit">Сгенерировать QR-код</button>
</form>
{% if qr_code %}
<img src="{{ qr_code }}" alt="QR-код">
{% endif %}
</body>
</html>
python app.py
23 Oct, 09:02
googlesearch
— это простая в использовании библиотека для Python, которая позволяет отправлять запросы в Google и получать ссылки на результаты поиска прямо из вашего кода. Она предоставляет гибкий и мощный интерфейс, который можно настроить для различных задач поиска.search()
, которая возвращает результаты поиска в виде генератора, содержащего объекты SearchResult
или URL-ссылки в зависимости от параметров.term
— строка с запросом, которую вы хотите найти в Google.num_results
— количество результатов, которые нужно вернуть (по умолчанию 10).lang
— язык результатов поиска (по умолчанию "en").proxy
— возможность использования прокси для запросов (по умолчанию None
).advanced
— расширенный режим поиска, возвращает объекты с более детализированной информацией, такими как заголовок и описание страницы (по умолчанию False
).sleep_interval
— время ожидания между запросами в секундах (по умолчанию 0).timeout
— максимальное время ожидания ответа от Google (по умолчанию 5 секунд).safe
— настройка безопасного поиска: "off" для отключения, "active" для фильтрации контента для взрослых (по умолчанию "active").ssl_verify
— возможность включить или отключить проверку SSL-сертификатов (по умолчанию None
).region
— регион для таргетирования поиска (по умолчанию None
).from googlesearch import search
query = "Python best practices"
for result in search(query, num_results=5):
print(result)
for result in search("новости технологий", num_results=3, lang='ru', safe='off', region='RU', advanced=True):
print(result.title, result.url)
RU
. В данном случае возвращаются не только ссылки, но и заголовки страниц.googlesearch
- это мощный инструмент, который позволяет интегрировать Google-поиск в Python-приложения, делая его отличным для автоматизации и поиска данных.23 Oct, 07:00
22 Oct, 08:56
print(2**1000)
10715086071862673209484250490600018105614048117055336074437503883703510511249361224931983788156958581275946729175531468251871452856923140435984577574698574803934567774824230985421074605062371141877954182153046474983581941267398767559165543946077062914571196477686542167660429831652624386837205668069376
21 Oct, 09:01
20 Oct, 16:22
import typer
app = typer.Typer()
@app.command()
def hello(name: str):
# Приветствие пользователя
print(f"Hello {name}")
@app.command()
def goodbye(name: str, formal: bool = False):
# Прощание с пользователем
if formal:
print(f"Goodbye Ms./Mr. {name}. Have a good day.")
else:
print(f"Bye {name}!")
if __name__ == "__main__":
app()
20 Oct, 09:03
composio-core
- это мощная библиотека для Python, предназначенная для упрощения процесса композиции и декомпозиции сложных объектов. Она предоставляет элегантный и интуитивно понятный API, который позволяет разработчикам легко создавать, модифицировать и анализировать сложные структуры данных.composio-core
очень просто. Вы можете использовать pip, стандартный менеджер пакетов Python. Вот команда для установки:pip install composio-core
composio-core
для создания и анализа сложного объекта:from composio_core import Composer, Analyzer
# Создаем композицию
composer = Composer()
complex_object = composer.create({
"name": "Проект X",
"components": [
{"type": "module", "name": "Auth", "version": "1.2.0"},
{"type": "database", "name": "UserDB", "engine": "PostgreSQL"},
{"type": "service", "name": "EmailNotifier", "protocol": "SMTP"}
]
})
# Анализируем созданный объект
analyzer = Analyzer()
analysis_result = analyzer.analyze(complex_object)
print(analysis_result.summary())
print(f"Количество компонентов: {analysis_result.component_count}")
print(f"Типы компонентов: {', '.join(analysis_result.component_types)}")
composio-core
.composio-core
, становится более читаемым и понятным. Это особенно важно при работе в команде или при поддержке долгосрочных проектов.composio-core
позволяет легко расширять функциональность библиотеки с помощью плагинов, что делает ее идеальным выбором для проектов с уникальными требованиями.composio-core
предлагает ряд продвинутых возможностей:import asyncio
from composio_core import AsyncComposer
async def create_complex_object():
composer = AsyncComposer()
object = await composer.create_async({
"name": "Async Project",
"components": [
{"type": "api", "name": "UserAPI", "version": "2.0.0"},
{"type": "queue", "name": "TaskQueue", "technology": "RabbitMQ"}
]
})
return object
complex_object = asyncio.run(create_complex_object())
print(complex_object)
composio-core
легко интегрируется с популярными Python-фреймворками, такими как Django и Flask. Например, вот как можно использовать библиотеку в Django-проекте:from django.views import View
from django.http import JsonResponse
from composio_core import Composer
class ProjectView(View):
def post(self, request):
composer = Composer()
project = composer.create(request.POST)
return JsonResponse({"project": project.to_dict()})
composio-core
представляет собой мощный и гибкий инструмент для работы со сложными структурами данных в Python. composio-core
может значительно упростить процесс работы с комплексными объектами и структурами данных.20 Oct, 07:00
19 Oct, 09:03
colors = {"red", "blue", "green"}
print(colors) # Вывод: {'blue', 'green', 'red'}
fruits = {"apple", "banana", "cherry"}
fruits.add("orange")
print(fruits) # Вывод: {'apple', 'banana', 'cherry', 'orange'}
fruits.add("apple") # Ничего не изменится
print(fruits) # Вывод: {'apple', 'banana', 'cherry', 'orange'}
numbers = {1, 2, 3, 4, 5}
numbers.remove(3)
print(numbers) # Вывод: {1, 2, 4, 5}
numbers.discard(10) # Ничего не произойдет
print(numbers) # Вывод: {1, 2, 4, 5}
numbers.remove(10) # Вызовет ошибку KeyError
set1 = {1, 2, 3}
set2 = {3, 4, 5}
set3 = set1.union(set2)
print(set3) # Вывод: {1, 2, 3, 4, 5}
pizza_lovers = {"Алиса", "Боб", "Чарли", "Дэвид"}
ice_cream_lovers = {"Боб", "Чарли", "Ева", "Фрэнк"}
pizza_and_ice_cream = pizza_lovers.intersection(ice_cream_lovers)
print(pizza_and_ice_cream) # Вывод: {'Боб', 'Чарли'}
set1 = {1, 2, 3, 4, 5}
set2 = {4, 5, 6, 7, 8}
diff = set1.difference(set2)
print(diff) # Вывод: {1, 2, 3}
group1 = {"пицца", "бургер", "суши"}
group2 = {"суши", "рамен", "пицца"}
unique_preferences = group1.symmetric_difference(group2)
print(unique_preferences) # Вывод: {'бургер', 'рамен'}
19 Oct, 07:00
18 Oct, 07:47
import geopandas as gpd
# Чтение геоданных
gdf = gpd.read_file('path/to/your/geodata.shp')
# Базовые операции
print(gdf.head())
print(gdf.crs) # Система координат
import matplotlib.pyplot as plt
gdf.plot()
plt.title('Визуализация геоданных')
plt.show()
buffered = gdf.geometry.buffer(1) # Создание буфера в 1 единицу
cities = gpd.read_file('cities.shp')
countries = gpd.read_file('countries.shp')
cities_with_countries = gpd.sjoin(cities, countries, how="inner", predicate="within")
17 Oct, 14:59
17 Oct, 07:57
import spacy
nlp = spacy.load("ru_core_news_sm")
doc = nlp("Кошки любят спать на мягких подушках.")
for token in doc:
print(f"{token.text} -> {token.lemma_}")
for token in doc:
print(f"{token.text} - {token.pos_}")
for ent in doc.ents:
print(f"{ent.text} - {ent.label_}")
def generate_sentence(subject, verb, object):
doc = nlp(f"{subject} {verb} {object}")
return " ".join([token.text for token in doc])
print(generate_sentence("Программист", "пишет", "код"))
def find_similar_word(word, n=3):
token = nlp(word)[0]
similar_words = []
for lex in nlp.vocab:
if lex.has_vector:
if lex.is_lower == token.is_lower and lex.is_alpha:
similarity = token.similarity(lex)
similar_words.append((lex.text, similarity))
return sorted(similar_words, key=lambda x: x[1], reverse=True)[:n]
print(find_similar_word("компьютер"))
16 Oct, 13:28
import torch.nn as nn
class SimpleNN(nn.Module):
def __init__(self):
super(SimpleNN, self).__init__()
self.fc1 = nn.Linear(784, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
for epoch in range(num_epochs):
for batch in data_loader:
optimizer.zero_grad()
outputs = model(batch)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
15 Oct, 15:28
population
— последовательность, из которой надо выбрать элементы (list
, tuple
, string
и т. д.)k
— количество элементов для выборки.counts
— список весов элементов (по умолчанию равновероятный выбор).rng
— генератор случайных чисел (по умолчанию берется из модуля random
).from random import sample
letters = ['a', 'b', 'c', 'd', 'e']
result = sample(letters, k=3)
print(result)
15 Oct, 09:13
import cProfile
def my_function():
# Ваш код здесь
pass
cProfile.run('my_function()')
@profile
def my_function():
# Ваш код здесь
pass
# Запустите с: kernprof -l -v your_script.py
14 Oct, 07:15
import concurrent.futures
import time
def task(name):
print(f"Задача {name} начата")
time.sleep(2)
return f"Задача {name} завершена"
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
tasks = [executor.submit(task, f"#{i}") for i in range(5)]
for future in concurrent.futures.as_completed(tasks):
print(future.result())
13 Oct, 08:09
pip install faststream
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
# Создаем брокер Kafka
broker = KafkaBroker("localhost:9092")
# Инициализируем FastStream
app = FastStream(broker)
# Определяем обработчик сообщений
@broker.subscriber("input-topic")
async def process_message(msg: str, logger: Logger):
logger.info(f"Получено сообщение: {msg}")
# Обработка сообщения
processed_msg = msg.upper()
# Отправка обработанного сообщения
await broker.publish(processed_msg, "output-topic")
# Запускаем приложение
if __name__ == "__main__":
app.run()
12 Oct, 10:55
# Legacy код на Python
def calculate_total(items):
total = 0
for item in items:
total = total + item['price'] * item['quantity']
return total
# Современный эквивалент
def calculate_total(items):
return sum(item['price'] * item['quantity'] for item in items)
11 Oct, 08:52
import weakref
class MyClass:
pass
obj = MyClass()
weak_ref = weakref.ref(obj)
print(weak_ref()) # Выводит объект MyClass
del obj
print(weak_ref()) # Выводит None
10 Oct, 15:50
import dask.array as da
# Создаем большой массив
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Выполняем операции
result = (x + 1).mean().compute()
print(f"Среднее значение: {result}")
import dask.dataframe as dd
# Читаем большой CSV файл
df = dd.read_csv('huge_file.csv')
# Выполняем группировку и агрегацию
result = df.groupby('category').agg({'value': 'mean'}).compute()
print(result)
from dask import delayed
@delayed
def process_data(x):
# Здесь может быть сложная обработка
return x * 2
data = [1, 2, 3, 4, 5]
results = [process_data(x) for x in data]
final_result = delayed(sum)(results).compute()
print(f"Итоговый результат: {final_result}")
10 Oct, 10:26
def my_decorator(func):
def wrapper(*args, **kwargs):
print("До выполнения функции")
result = func(*args, **kwargs)
print("После выполнения функции")
return result
return wrapper
@my_decorator
def greet(name):
"""Эта функция приветствует пользователя"""
print(f"Привет, {name}!")
print(greet.__name__) # Выводит: wrapper
print(greet.__doc__) # Выводит: None
from functools import wraps
def my_decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("До выполнения функции")
result = func(*args, **kwargs)
print("После выполнения функции")
return result
return wrapper
@my_decorator
def greet(name):
"""Эта функция приветствует пользователя"""
print(f"Привет, {name}!")
print(greet.__name__) # Выводит: greet
print(greet.__doc__) # Выводит: Эта функция приветствует пользователя
functools.wraps
: