Leveraging Social Media for Operational Threat Intelligence. Part Deux

Implementing Further filtering

This part 2 of my series, the first part can be read hereLeveraging Social Media for Operational Threat Intelligence

Now that you have the ability to connect to twitter and directly pull down tweets, we want to start building out the functionality that will identify which of these tweets are the most relevant to us. Keep in mind that what we’re working on is actually a second layer of filtering, we set up the first filter when we first opened up the streamer, which means Twitter will only send us tweets based on the terms or users we provided.

Now we want to see if any of the specific statuses are pertinent to us. There’s multiple ways that we can do this, what we’ll do is look at specific fields within the tweets themselves and see if there’s a value in there that we care about. To get a full list of all the possible fields associated with the tweets, you can find all of them on the Twitter development website.


Status(quote_count=0, contributors=None, truncated=False, text=u'hello worlds \n\nhttps://t.co/ljTv72TLxm', is_quote_status=False, in_reply_to_status_id=[redacted], reply_count=0, id=906254180414783489, favorite_count=0, _api=<tweepy.api.API object at 0x10309c350>, source=u'Twitter Web Client', _json={u'quote_count': 0, u'contributors': None, u'truncated': False, u'text': u'hello worlds \n\nhttps://t.co/ljTv72TLxm', u'is_quote_status': False, u'in_reply_to_status_id': [redacted], u'reply_count': 0, u'id': 906254180414783489, u'favorite_count': 0, u'source': u'<a href="http://twitter.com&quot; rel="nofollow">Twitter Web Client</a>', u'retweeted': False, u'coordinates': None, u'timestamp_ms': u'1504902808656', u'entities': {u'user_mentions': [], u'symbols': [], u'hashtags': [], u'urls': [{u'url': u'https://t.co/ljTv72TLxm&#39;, u'indices': [15, 38], u'expanded_url': u'http://example.com&#39;, u'display_url': u'example.com'}]}, u'in_reply_to_screen_name': u'[redacted]', u'id_str': u'[redacted]', u'retweet_count': 0, u'in_reply_to_user_id': [redacted], u'favorited': False, u'user': {u'follow_request_sent': None, u'profile_use_background_image': True, u'default_profile_image': False, u'id': [redacted], u'default_profile': False, u'verified': False, u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, u'profile_sidebar_fill_color': u'000000', u'profile_text_color': u'000000', u'followers_count': 3, u'profile_sidebar_border_color': u'000000', u'id_str': u'4508967437', u'profile_background_color': u'C0DEED', u'listed_count': 0, u'profile_background_image_url_https': u'https://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, u'utc_offset': None, u'statuses_count': 2, u'description': None, u'friends_count': 0, u'location': None, u'profile_link_color': u'0084B4', u'profile_image_url': u'http://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, u'following': None, u'geo_enabled': False, u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/4508967437/1450318733&#39;, u'profile_background_image_url': u'http://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, u'name': u'[redacted]', u'lang': u'en', u'profile_background_tile': False, u'favourites_count': 0, u'screen_name': u'[redacted]', u'notifications': None, u'url': None, u'created_at': u'Thu Dec 17 01:57:27 +0000 2015', u'contributors_enabled': False, u'time_zone': None, u'protected': False, u'translator_type': u'none', u'is_translator': False}, u'geo': None, u'in_reply_to_user_id_str': u'[redacted]', u'possibly_sensitive': False, u'lang': u'en', u'created_at': u'Fri Sep 08 20:33:28 +0000 2017', u'filter_level': u'low', u'in_reply_to_status_id_str': u'[redacted]', u'place': None}, coordinates=None, timestamp_ms=u'1504902808656', entities={u'user_mentions': [], u'symbols': [], u'hashtags': [], u'urls': [{u'url': u'https://t.co/ljTv72TLxm&#39;, u'indices': [15, 38], u'expanded_url': u'http://example.com&#39;, u'display_url[redacted] u'example.com'}]}, in_reply_to_screen_name=u'[redacted]', in_reply_to_user_id=[redacted], retweet_count=0, id_str=u'[redacted]', favorited=False, source_url=u'http://twitter.com&#39;, user=User(follow_request_sent=None, profile_use_background_image=True, _json={u'follow_request_sent': None, u'profile_use_background_image': True, u'default_profile_image': False, u'id': [redacted], u'default_profile': False, u'verified': False, u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, u'profile_sidebar_fill_color': u'000000', u'profile_text_color': u'000000', u'followers_count': 3, u'profile_sidebar_border_color': u'000000', u'id_str': u'[redacted]', u'profile_background_color': u'C0DEED', u'listed_count': 0, u'profile_background_image_url_https': u'https://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, u'utc_offset': None, u'statuses_count': 2, u'description': None, u'friends_count': 0, u'location': None, u'profile_link_color': u'0084B4', u'profile_image_url': u'http://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, u'following': None, u'geo_enabled': False, u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/4508967437/1450318733&#39;, u'profile_background_image_url': u'http://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, u'name': u'[redacted]', u'lang': u'en', u'profile_background_tile': False, u'favourites_count': 0, u'screen_name': u'[redacted]', u'notifications': None, u'url': None, u'created_at': u'Thu Dec 17 01:57:27 +0000 2015', u'contributors_enabled': False, u'time_zone': None, u'protected': False, u'translator_type': u'none', u'is_translator': False}, id=4508967437, _api=<tweepy.api.API object at 0x10309c350>, verified=False, translator_type=u'none', profile_image_url_https=u'https://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, profile_sidebar_fill_color=u'000000', is_translator=False, geo_enabled=False, profile_text_color=u'000000', followers_count=3, protected=False, location=None, default_profile_image=False, id_str=u'4508967437', utc_offset=None, statuses_count=2, description=None, friends_count=0, profile_link_color=u'0084B4', profile_image_url=u'http://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, notifications=None, profile_background_image_url_https=u'https://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, profile_background_color=u'C0DEED', profile_banner_url=u'https://pbs.twimg.com/profile_banners/4508967437/1450318733&#39;, profile_background_image_url=u'http://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, screen_name=u'[redacted]', lang=u'en', profile_background_tile=False, favourites_count=0, name=u'[redacted]', url=None, created_at=datetime.datetime(2015, 12, 17, 1, 57, 27), contributors_enabled=False, time_zone=None, profile_sidebar_border_color=u'000000', default_profile=False, following=False, listed_count=0), geo=None, in_reply_to_user_id_str=u'[redacted]', possibly_sensitive=False, lang=u'en', created_at=datetime.datetime(2017, 9, 8, 20, 33, 28), author=User(follow_request_sent=None, profile_use_background_image=True, _json={u'follow_request_sent': None, u'profile_use_background_image': True, u'default_profile_image': False, u'id': [redacted], u'default_profile': False, u'verified': False, u'profile_image_url_https': u'https://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, u'profile_sidebar_fill_color': u'000000', u'profile_text_color': u'000000', u'followers_count': 3, u'profile_sidebar_border_color': u'000000', u'id_str': u'[redacted]', u'profile_background_color': u'C0DEED', u'listed_count': 0, u'profile_background_image_url_https': u'https://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, u'utc_offset': None, u'statuses_count': 2, u'description': None, u'friends_count': 0, u'location': None, u'profile_link_color': u'0084B4', u'profile_image_url': u'http://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, u'following': None, u'geo_enabled': False, u'profile_banner_url': u'https://pbs.twimg.com/profile_banners/4508967437/1450318733&#39;, u'profile_background_image_url': u'http://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, u'name': u'[redacted]', u'lang': u'en', u'profile_background_tile': False, u'favourites_count': 0, u'screen_name': u'[redacted]', u'notifications': None, u'url': None, u'created_at': u'Thu Dec 17 01:57:27 +0000 2015', u'contributors_enabled': False, u'time_zone': None, u'protected': False, u'translator_type': u'none', u'is_translator': False}, id=4508967437, _api=<tweepy.api.API object at 0x10309c350>, verified=False, translator_type=u'none', profile_image_url_https=u'https://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, profile_sidebar_fill_color=u'000000', is_translator=False, geo_enabled=False, profile_text_color=u'000000', followers_count=3, protected=False, location=None, default_profile_image=False, id_str=u'[redacted]', utc_offset=None, statuses_count=2, description=None, friends_count=0, profile_link_color=u'0084B4', profile_image_url=u'http://pbs.twimg.com/profile_images/677308543389732864/OlTnHjxI_normal.jpg&#39;, notifications=None, profile_background_image_url_https=u'https://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, profile_background_color=u'C0DEED', profile_banner_url=u'https://pbs.twimg.com/profile_banners/4508967437/1450318733&#39;, profile_background_image_url=u'http://pbs.twimg.com/profile_background_images/677311627830558720/3ZWlw0ek.jpg&#39;, screen_name=u'[redacted]', lang=u'en', profile_background_tile=False, favourites_count=0, name=u'[redacted]', url=None, created_at=datetime.datetime(2015, 12, 17, 1, 57, 27), contributors_enabled=False, time_zone=None, profile_sidebar_border_color=u'000000', default_profile=False, following=False, listed_count=0), filter_level=u'low', in_reply_to_status_id_str=u'[redacted]', place=None, retweeted=False)

view raw

Tweet_example

hosted with ❤ by GitHub

As you can see tweets consist of various different fields, for a second tier of filtering, we’re interested in looking at domains and twitter account mentions in the tweet. Each of these will require that we look at different fields in the status and use it to compare to a list of another criteria and if that criteria are met, we want some action to occur. The specifics of what the action will occur, aka what will happen when a test comes up true will be discussed in part 3 of this series, where we’ll roll out a way for notifications to be sent. At this specific step we’ll be developing the logic tests to make the determination whether or not an action should be taken.

We will contain our logic tests in simple functions whose sole purpose is to return a TRUE or FLASE dependent on whether or not the test meets a criterion. Since we’re looking at two unique tests, we’ll be writing two unique functions which will then be called by the def on-status method

The values that will be tested against will be stored as part of the Listener class, so we will also be adding functions that will handle the loading of this data. For our super simple example, we’ll just assume that the data is found in a text file which we can simply read from. In your case, you may already have a database that contains that information that you could theoretically pull from. But since we’re looking for a simple proof of concept we’re just going to go with the simplest example.

 

Loading the values
To see if a value exists in a status, you’ll first need to load it…obviously. In this case we’re assuming that you have the following information in the following formats. You’re going to need the following:

  • a text file of “good guy” twitter accounts, these are the twitter accounts that might be associated with your organization + potentially your partners.
    • We’ll say this is stored in a simple text file
  • A text file of domains you want to monitor
    • We’ll say this is stored in a simple text file

 

To make our directory a little bit cleaner, I’ll be creating a ‘data’ folder that I’ll be inputting all of these files.

 

We will want to load these values on the start of the Listener class, so we’ll add the call of our function into the __init__ method.


class Listener(StreamListener):
def __init__(self, api=None):
# This method used to define what needs to be done
# before the class can be actually used
# In our case, we're making sure the api is set up
# and loading the values we need for our tests
self.api = api or tweepy.API()
self.domains = load_domains()
self.twitter_accounts = load_twitter_accounts()
# simple proof of concept
def on_status(self, status):
# This is the method the processes the statuses
# sent by twitter, in our case we want to check
# the statuses against our tests
if self.domain_test(status=status):
# This is the function we want to call in
# when we find a match 🙂
twitter_hit(status)
if self.user_mention(status=status):
# This is the function we want to call in
# when we find a match 🙂
twitter_hit(status)

view raw

logic_test

hosted with ❤ by GitHub

 

Our two loading files follow the same process (these probably could combined into one function, but screw it I don’t mind doing the occasional ctrl-c ctrl-v style programming.

Domains

The domain loader function will load the domains from the domain.txt file which will be included in our example file, while the twitter account loader will do the same for the twitter accounts we want to identify for mentions. The mentioned twitter accounts are different than the bad guy accounts since we want to identify when they’re being mentioned, not necessarily when they themselves are tweeting.


def load_domains():
# start an empty array
domains = []
# A little status information for us
print "[] Loading domains"
# We want our code to platform neutral, so use os.path.join
# to get to our data directory and extract out domains
file_location = os.path.join('data','domains.txt')
# First check to see if the file is actually there
if os.path.isfile(file_location):
# Open the file and ready each line
with open(file_location) as f:
for domain in f.readlines():
# lets strip out newline characters + lower them
dom =domain.strip('\n').strip('\r').lower()
domains.append(str(dom))
# At the end lets provide a status that
# tells us how many domains we got
print "[x] %s Domains Loaded" % len(domains)
else:
print '][ Failed to load domains, File not found'
# return to new array of domains
return domains

view raw

domain_loader

hosted with ❤ by GitHub

Working through the domain comparison. For our case we’re interested in ONLY the domain since our list consists of only domains. To allow us to compare apples to apples, we need to find a way that we can extract only the domains, regardless of the protocol, and regardless of which resource is being associated[URI]. This is where we introduce our second dependency, urlparse, this great library allows you to extract different components of a URL by parsing them into an object in which you can then call upon the different components of the URL.

 

On gist below, you can see us calling the function urlparse to the URL object found in the mentioned piece of the status object. Once it’s parse, we will then extract just the domain by calling the netloc method. After that, we got to convert that back to a string, make it lowercase (cause remember we did the same thing with our domain list, lowercase them all, so we can compare apples to apples) and lastly we ask if “in” our current array of domains. This means that the results array will contain a list of a TRUE or FALSE, telling us if any of the mentioned URLs are one of ours.  At the end we don’t want to know how many of our domains are mentioned, we just want to know IF any of our domains are found. So lastly, we call the “any” function on the results array to identify if any of our results within the array are “True”.


def domain_test(self,status):
# begin by making an empty array that will store our TRUE/FALSE responses
results = []
# first lets check to see if we got URLs in our Entities object of the status
if status.entities['urls']:
# In a status we'll want to check all the urls that might be there
# so we create a very quick for loop
for domain in status.entities['urls']:
# A check to see if there's anything in our expanded url
# Twitter automatically converts MOST [if not all] urls
# into their twitter url shorting service the 't.co'
# this is to save space on the actual tweet itself,
# however, the full expanded is still stored with the status.
# So that's where we'll grab it
if domain['expanded_url']:
# This is the meat of the script
# This part helps us determine if the expanded url is in our domain list
# It then adds a TRUE or FALSE to results
# If you want to test to see if this section is working,
# you can change this logic to just print the values, instead of storing them
# but caution, you'll be getting lots of hits depending on your track
results.append(str(urlparse(domain['expanded_url']).netloc).lower() in self.domains)
else:
pass
else:
pass
# We now return if there's any TRUE's in our array
return any(results)

view raw

domain_test

hosted with ❤ by GitHub

Username mentions

To check to see if the our twitter accounts are in the mentions, we’ll want to go through the same essential process. Go through the User-mention dictionary + cycle through the user screen names and see if they’re in our list.


def user_mention(self, status):
# Once again we create an empty array
results = []
# Here we want to check the user mentions component of the status
# So the first step is to identify if it's empty or not
if status.entities['user_mentions']:
# Multiple users can be mentioned in a tweet,
# so we'll want to build a for-loop
for user in status.entities['user_mentions']:
# We lower our results like we lowered our input to compare them
# For our case, we're pulling out the value of screen_name, this is
# based on the assumption that you're tracking the screen names of the user's.
# Alternatively you could use their id, but you'd need to set up a process to get
# the ids first.
results.append(str(user['screen_name']).lower() in self.twitter_accounts)
else:
pass
# We want to return if there's any that true in our array
return any(results)

Lastly, we want to define a super simple action for us to take if the tests turn up true, in our case, we’ll just print that a hit was found. Here’s mine:


def twitter_hit(status):
# This is a super simple indicator that we found something,
# and the logic will be built up in the next one
print '[x] Hit found \n'

view raw

twitter_tut_hit

hosted with ❤ by GitHub

And there you have it, you have a way to check the tweet statuses based on two simple criteria. Feel free to try other ones and build off this simple foundation. Stay tuned for the next one, as we’ll go into detail in terms of how to create notification emails based on the hits.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s