Streaming tweets using Twitter V2 API | Tweepy
Get live tweets using Twitter V2 API and store them in a database.
With v2 Twitter API, things have changed when it comes to streaming tweets. Today we're going to see how to use StreamingClient to stream tweets and store them into an SQLite3 database.
About Twitter V2 API
For streaming tweets, you are most likely to apply for an "Elevated" account.
The application process is fairly simple and easy. Once the application has been submitted, you will receive an "approval" email from the Twitter Dev team.
Things to be done on your Twitter developer portal
After you've got your Elevate access, visit Developer portal to get your projects and apps ready.
Move to the projects and apps menu, present on the left side of the developer portal, and add an application as required.
Click on "Add app
- Select App environment
- App name
- Keys & Token
Next, you will get your API keys and tokens along with a bearer token.
💡 Save them, cause we'll need them to make requests to the Twitter API.
Now, let's move on to the next section.
Installing tweepy
Installing Tweepy is pretty straightforward 📏.
The official tweepy documentation has everything we need. Make sure to have a look at it.
- Make a python virtual environment
python -m venv venv
- Install tweepy
pip install tweepy
See, that's not hard😸.
Now that we are done with the requirements, we can move to the coding section.
Let's write some code
Before that, let's structure our code.
- Make a database directory where we'll store the SQLite DB files.
- A
main.py
file where all our code goes in, and - A
.env
file that will store all our API keys and tokens.
💡 For this project, I have put everything into one file but you can always refactor them into separate modules as per requirements.
Now, We are ready! 🚗
1. Store the API keys and tokens in a .env file
API_KEY="apikeygoeshere"
API_KEY_SECRET="apikeysecretgoeshere"
ACCESS_TOKEN="accesstokengoeshere"
ACCESS_TOKEN_SECRET="accesstokensecretgoeshere"
BEARER_TOKEN="bearertokengoeshere"
2. Importing all necessary packages
from dotenv import load_dotenv
import os
import sqlite3
import tweepy
import time
import argparse
3. Loading the API credentials
load_dotenv()
api_key = os.getenv("API_KEY")
api_key_secret = os.getenv("API_KEY_SECRET")
access_key = os.getenv("ACCESS_KEY")
access_key_secret = os.getenv("ACCESS_KEY_SCERET")
bearer_token = os.getenv("BEARER_TOKEN")
4. Creating the database
conn = sqlite3.connect("./database/tweets.db")
print("DB created!")
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS tweets (username TEXT,tweet TEXT)")
print("Table created")
5. Creating the Streaming class
class TweetStreamV2(tweepy.StreamingClient):
new_tweet = {}
def on_connect(self):
print("Connected!")
def on_includes(self, includes):
self.new_tweet["username"] = includes["users"][0].username
print(self.new_tweet)
# insert tweets in db
cursor.execute(
"INSERT INTO tweets VALUES (?,?)",
(
self.new_tweet["username"],
self.new_tweet["tweet"],
),
)
conn.commit()
# print(self.new_tweet)
print("tweet added to db!")
print("-" * 30)
def on_tweet(self, tweet):
if tweet.referenced_tweets == None:
# self.new_tweet["tweet"] = tweet.text
print(tweet.text)
time.sleep(0.3)
What does the code say?
Before moving into details, I request you to please have a look at the StreamingClient documentation. This will make things more clear.
on_connect
method prints a "Connected" message, letting us know that we have successfully connected to the Twitter API.on_tweet
method receives a tweet and processes it according to the conditions, if there are any, and adds the tweet to the hashmap.on_includes
is responsible for the user details and adds the user data to the hashmap.- Finally, the data in the hashmap is inserted into the tweets table.
6. Main function
def main():
# get args
parser = argparse.ArgumentParser()
parser.add_argument("search_query", help="Twitter search query")
args = parser.parse_args()
query = args.search_query
stream = TweetStreamV2(bearer_token)
# delete previous query
prev_id = stream.get_rules().data[0].id
stream.delete_rules(prev_id)
# add new query
stream.add_rules(tweepy.StreamRule(query))
print(stream.get_rules())
stream.filter(
tweet_fields=["created_at", "lang"],
expansions=["author_id"],
user_fields=["username", "name"],
)
What does the code say?
- The python script takes an argument,
search_query
. - This argument is added to the stream rules after deleting the previously added rules.
- Rules are basically searched queries that go in as input into the stream object. There can be more than one rule. And each rule has a
value
,tag
and anid
. - The
id
is passed on to thedelete_rules
method to delete a rule.
💡 I suggest you refer to the official documentation for more details on adding and deleting rules.
- Next, we have the filter method. It is responsible for filtering the tweets based on the
query
passed and the fields chosen.
All the different fields are:
expansions (list[str] | str) – expansions media_fields (list[str] | str) – media_fields place_fields (list[str] | str) – place_fields poll_fields (list[str] | str) – poll_fields tweet_fields (list[str] | str) – tweet_fields user_fields (list[str] | str) – user_fields threaded (bool) – Whether or not to use a thread to run the stream
💡 Refer to the official documentation
Let's try out our app
To test if everything is working, we pass on Spiderman
argument while running the main.py file.
$ python main.py Spiderman
This will create a tweets.db
file inside the database directory.
And if you view the tweets.db
file, you will find a table with username
and tweet
as its columns respectively.
username | tweet |
username | some_tweets_about_spiderman |
Conclusion
This is an example showing how to use the Twitter V2 API with python using the Tweepy library to get live tweets and store them in a database. You can also use csv
, json
files to store tweets.
I will keep adding more blogs to this series.
🤝Follow me on Twitter and Hashnode
🌎Explore, 🎓Learn, 👷♂️Build.
Happy Coding💛