Now that I’m more or less done with school (getting my bachelor’s in CS! yay!) I can finally dedicate a bit more time to this. A few weeks ago, I threw together a simple word counter for a single account’s archive.json. Now I’d like to expand that out to a larger dataset; namely, the post.csv from community archive’s huggingface.
First things first, I actually have to load the rather sizable file. I’ve used pandas in the past for machine learning, so I’m pretty sure that’ll work:
import pandas as pd
data = pd.read_csv('../post.csv')
data.info()
This tells me what the columns are, along with some other information:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6135978 entries, 0 to 6135977
Data columns (total 14 columns):
# Column Dtype
--- ------ -----
0 tweet_id object
1 account_id object
2 created_at object
3 full_text object
4 retweet_count object
5 favorite_count object
6 reply_to_tweet_id object
7 reply_to_user_id object
8 reply_to_username object
9 archive_upload_id object
10 updated_at object
11 username object
12 temporal_subset object
13 topic object
dtypes: object(14)
memory usage: 655.4+ MB
That’s a lot of data! It’s probably enough that it’s worth avoiding writing out regular python loops. In fact, I probably want to avoid using python altogether. If I search for a word and get the count to plot it over time:
import matplotlib.pyplot as plt
import datetime as dt
word = "election"
start_date = dt.date(2020, 1, 1)
end_date = dt.date(2022, 1, 1)
df["matches"] = df["full_text"].str.lower().str.count(rf"\b{word}\b")
daily = df.groupby(df["created_at"].dt.date)["matches"].sum()
graph = daily.plot()
graph.set_xlim(pd.to_datetime(start_date), pd.to_datetime(end_date))
plt.show()
This takes roughly ten seconds to run. Now, I haven’t used pandas so AI wrote a lot of this and I’m not sure it’s the most efficient, but spending several seconds per word really isn’t going to work if I also want to search for spikes in word usage or anything like that. One of my attempts at it ran for six hours before I cancelled it:
Clearly I should try something else. Coincidentally, I’ve been trying to get started with OpenSearch for an internship, and while I’m still a little fuzzy on what exactly it’s for, it does sound like it’d be suitable for my purposes.
First off, I need to get an instance of OpenSearch up and running. They provide a quickstart using docker, but the example compose uses a couple nodes and a security plugin that I don’t need so instead I’ll be using this modified version:
services:
opensearch:
image: opensearchproject/opensearch:latest
container_name: opensearch-single
environment:
- cluster.name=opensearch-cluster
- node.name=opensearch-single
- discovery.type=single-node # Single node discovery
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx16g" # Increase memory for handling multiple GB of tweet data, this is probably overkill
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD:-admin123} # Default password for local dev, doesn't currently use it
- "DISABLE_INSTALL_DEMO_CONFIG=true" # Disable demo config for cleaner setup
- "DISABLE_SECURITY_PLUGIN=true" # Disable security for easier local development
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- opensearch-data:/usr/share/opensearch/data
ports:
- 9200:9200 # REST API
- 9600:9600 # Performance Analyzer
networks:
- opensearch-net
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:latest
container_name: opensearch-dashboards
ports:
- 5601:5601
expose:
- "5601"
environment:
OPENSEARCH_HOSTS: '["http://opensearch:9200"]' # Updated to use single node without HTTPS
DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true" # Disable security for easier local development
networks:
- opensearch-net
volumes:
opensearch-data:
networks:
opensearch-net:
After bringing this up with docker compose up
, I use this script (mostly written with the new [Gemini CLI](google-gemini/gemini-cli: An open-source AI agent that brings the power of Gemini directly into your terminal. to ingest all my data:
import { createReadStream } from 'fs';
import { parse } from 'csv-parse';
import { Client } from '@opensearch-project/opensearch';
const BATCH_SIZE = 1000;
const INDEX_NAME = 'posts';
// Configure the OpenSearch client
// Connects to the 'opensearch' service from docker-compose.yml
const client = new Client({
node: process.env.OPENSEARCH_URL || 'http://localhost:9200',
});
async function ingestCSV(filePath: string) {
console.log(`Starting ingestion of ${filePath} into index "${INDEX_NAME}"...`);
const parser = createReadStream(filePath).pipe(
parse({
columns: true, // Use the first row as headers
trim: true,
})
);
let batch: any[] = [];
let rowCount = 0;
// Check if the index exists, and create it if it doesn't
const { body: indexExists } = await client.indices.exists({ index: INDEX_NAME });
if (!indexExists) {
console.log(`Index "${INDEX_NAME}" does not exist. Creating...`);
await client.indices.create({
index: INDEX_NAME,
body: {
mappings: {
properties: {
tweet_id: { type: 'keyword' },
account_id: { type: 'keyword' },
created_at: { type: 'date' }, // Use default date mapping
full_text: { type: 'text' },
retweet_count: { type: 'integer' },
favorite_count: { type: 'integer' },
reply_to_tweet_id: { type: 'keyword' },
reply_to_user_id: { type: 'keyword' },
reply_to_username: { type: 'keyword' },
username: { type: 'keyword' }.
},
},
},
});
console.log(`Index "${INDEX_NAME}" created.`);
} else {
console.log(`Index "${INDEX_NAME}" already exists.`);
}
for await (const record of parser) {
// Convert created_at to ISO 8601 format before indexing
if (record.created_at) {
try {
// The Date constructor can parse 'YYYY-MM-DD HH:MM:SS+ZZ:ZZ'
record.created_at = new Date(record.created_at).toISOString();
} catch (e) {
console.error(`Could not parse date: ${record.created_at}`);
// Set to null if parsing fails. OpenSearch will reject if the field is not nullable.
record.created_at = null;
}
}
batch.push({ index: { _index: INDEX_NAME } });
batch.push(record);
rowCount++;
if (batch.length >= BATCH_SIZE * 2) {
await sendBulkRequest(batch);
console.log(`Ingested ${rowCount} rows...`);
batch = [];
}
}
// Ingest any remaining documents
if (batch.length > 0) {
await sendBulkRequest(batch);
}
console.log(`Finished ingestion.`);
console.log(`Total rows processed: ${rowCount}`);
// Refresh the index to make the documents searchable
await client.indices.refresh({ index: INDEX_NAME });
const countResponse = await client.count({ index: INDEX_NAME });
console.log(`Total documents in index "${INDEX_NAME}": ${countResponse.body.count}`);
}
async function sendBulkRequest(batch: any[]) {
try {
const response = await client.bulk({
body: batch,
});
if (response.body.errors) {
console.error('Bulk ingestion had errors:');
// Log detailed information for failed items
response.body.items.forEach((item: any) => {
if (item.index && item.index.error) {
console.error(`Error for item ${item.index._id}:`, item.index.error);
}
});
}
} catch (error) {
console.error('Error during bulk ingestion:', error);
// It might be useful to exit or implement a retry mechanism here
process.exit(1);
}
}
// --- Script execution ---
async function main() {
const filePath = process.argv[2];
if (!filePath) {
console.error('Please provide the path to the CSV file.');
console.error('Usage: bun ingest_csv.ts <path-to-csv-file>');
process.exit(1);
}
try {
await ingestCSV(filePath);
} catch (error) {
console.error('An unexpected error occurred:', error);
process.exit(1);
}
}
main();
Now, I’m even less familiar with typescript and OpenSearch than I am pandas, but this does actually seem to work. Now all I need to do is query OpenSearch to make sure that the data is in there:
curl -X GET "http://localhost:9200/posts/_search?pretty" -H "Content-Type: application/json" -d '{"size": 3}'
Interestingly, the return seems to indicate that OpenSearch automatically created mappings for some of the columns that I didn’t define, so that’s neat. Still, since the data is in there I can just use the OpenSearch dashboard to generate graphs:
Searching for the word “election”, there’s a spike every October or so on years where there’s a US Presidential election, so it seems like it’s working as expected. And more importantly, I can search for new words almost instantly; the logs indicate that the requests are handled in just dozens of milliseconds. This is literally many orders of magnitude faster than my (likely very poor) pandas implementation. Nice!