Unix Timestamp Examples
Real-world scenarios and practical code examples to help you master Unix timestamp operations in your applications.
Basic Conversions
Getting Current Time
The most fundamental operation - getting the current Unix timestamp in different programming languages.
1// JavaScript - Current timestamp in seconds
2const now = Math.floor(Date.now() / 1000);
3console.log(now); // 1703980800
4
5// JavaScript - Current timestamp in milliseconds
6const nowMs = Date.now();
7console.log(nowMs); // 1703980800000
8
9// High precision timestamp (microseconds)
10const nowMicros = Math.floor(Date.now() * 1000);
11console.log(nowMicros); // Microsecond precision
1# Python - Current timestamp
2import time
3from datetime import datetime, timezone
4
5# Using time module
6now = int(time.time())
7print(now) # 1703980800
8
9# Using datetime module
10now = int(datetime.now(timezone.utc).timestamp())
11print(now) # 1703980800
12
13# High precision
14import time
15precise_now = time.time_ns()
16print(precise_now) # Nanosecond precision (19 digits)
Date to Timestamp
Convert specific dates to Unix timestamps for storage and comparison.
1// JavaScript - Convert specific dates to timestamps
2
3// From date string
4const date1 = new Date('2023-12-31T00:00:00Z');
5const timestamp1 = Math.floor(date1.getTime() / 1000);
6console.log(timestamp1); // 1704067200
7
8// From date components
9const date2 = new Date(Date.UTC(2023, 11, 31, 0, 0, 0)); // Note: month is 0-indexed
10const timestamp2 = Math.floor(date2.getTime() / 1000);
11console.log(timestamp2); // 1704067200
12
13// Using a utility function
14function dateToTimestamp(dateString) {
15 const date = new Date(dateString);
16 if (isNaN(date.getTime())) {
17 throw new Error('Invalid date string');
18 }
19 return Math.floor(date.getTime() / 1000);
20}
21
22// Examples
23console.log(dateToTimestamp('2023-01-01')); // 1672531200
24console.log(dateToTimestamp('2024-01-01T23:59:59Z')); // 1704153599
25
26// Batch conversion
27const dates = ['2023-01-01', '2023-06-15', '2023-12-31'];
28const timestamps = dates.map(dateToTimestamp);
29console.log(timestamps); // [1672531200, 1686787200, 1704067200]
Timestamp to Date
Convert Unix timestamps back to human-readable dates in various formats.
1from datetime import datetime, timezone
2import time
3
4def timestamp_to_date(timestamp, format_string='%Y-%m-%d %H:%M:%S'):
5 """Convert Unix timestamp to formatted date string"""
6 try:
7 # Convert to datetime object
8 dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
9 return dt.strftime(format_string)
10 except (ValueError, OSError) as e:
11 return f"Error: {e}"
12
13# Examples
14timestamp = 1704067200
15
16# Different formats
17print(timestamp_to_date(timestamp)) # 2024-01-01 00:00:00
18print(timestamp_to_date(timestamp, '%B %d, %Y')) # January 01, 2024
19print(timestamp_to_date(timestamp, '%Y-%m-%d')) # 2024-01-01
20print(timestamp_to_date(timestamp, '%I:%M %p on %B %d, %Y')) # 12:00 AM on January 01, 2024
21
22# Batch conversion with different timezones
23import pytz
24
25def batch_convert_with_timezone(timestamps, tz_name='UTC'):
26 """Convert multiple timestamps to dates in specified timezone"""
27 tz = pytz.timezone(tz_name)
28 results = []
29
30 for ts in timestamps:
31 dt = datetime.fromtimestamp(ts, tz=timezone.utc)
32 local_dt = dt.astimezone(tz)
33 results.append({
34 'timestamp': ts,
35 'utc': dt.strftime('%Y-%m-%d %H:%M:%S UTC'),
36 'local': local_dt.strftime(f'%Y-%m-%d %H:%M:%S {tz_name}'),
37 'timezone': tz_name
38 })
39
40 return results
41
42# Example usage
43timestamps = [1704067200, 1704153600, 1704240000]
44results = batch_convert_with_timezone(timestamps, 'US/Eastern')
45for result in results:
46 print(f"{result['timestamp']} -> {result['local']}")
Time Calculations
Time Differences
Calculate the difference between two timestamps to measure duration or elapsed time.
1class TimeCalculator {
2 static getTimeDifference(timestamp1, timestamp2 = null) {
3 const ts2 = timestamp2 || Math.floor(Date.now() / 1000);
4 const diffSeconds = Math.abs(ts2 - timestamp1);
5
6 return {
7 seconds: diffSeconds,
8 minutes: Math.floor(diffSeconds / 60),
9 hours: Math.floor(diffSeconds / 3600),
10 days: Math.floor(diffSeconds / 86400),
11 weeks: Math.floor(diffSeconds / 604800),
12 months: Math.floor(diffSeconds / 2592000), // Approximate
13 years: Math.floor(diffSeconds / 31536000) // Approximate
14 };
15 }
16
17 static formatTimeAgo(timestamp) {
18 const now = Math.floor(Date.now() / 1000);
19 const diff = now - timestamp;
20
21 if (diff < 60) return 'just now';
22 if (diff < 3600) return `${Math.floor(diff / 60)} minutes ago`;
23 if (diff < 86400) return `${Math.floor(diff / 3600)} hours ago`;
24 if (diff < 604800) return `${Math.floor(diff / 86400)} days ago`;
25 if (diff < 2592000) return `${Math.floor(diff / 604800)} weeks ago`;
26 if (diff < 31536000) return `${Math.floor(diff / 2592000)} months ago`;
27 return `${Math.floor(diff / 31536000)} years ago`;
28 }
29
30 static calculateAge(birthTimestamp) {
31 const now = Math.floor(Date.now() / 1000);
32 const ageInSeconds = now - birthTimestamp;
33 const ageInYears = Math.floor(ageInSeconds / 31536000);
34
35 return {
36 years: ageInYears,
37 totalSeconds: ageInSeconds,
38 totalDays: Math.floor(ageInSeconds / 86400)
39 };
40 }
41
42 static getSessionDuration(startTimestamp, endTimestamp = null) {
43 const end = endTimestamp || Math.floor(Date.now() / 1000);
44 const duration = end - startTimestamp;
45
46 const hours = Math.floor(duration / 3600);
47 const minutes = Math.floor((duration % 3600) / 60);
48 const seconds = duration % 60;
49
50 return {
51 total: duration,
52 formatted: `${hours}h ${minutes}m ${seconds}s`,
53 hours,
54 minutes,
55 seconds
56 };
57 }
58}
59
60// Usage examples
61const postTimestamp = 1703980800; // December 31, 2023
62const now = 1704024000; // January 1, 2024
63
64console.log(TimeCalculator.formatTimeAgo(postTimestamp)); // "12 hours ago"
65
66const birthdate = 631152000; // January 1, 1990
67console.log(TimeCalculator.calculateAge(birthdate)); // { years: 34, ... }
68
69const sessionStart = 1703980800;
70console.log(TimeCalculator.getSessionDuration(sessionStart)); // Current session duration
Date Arithmetic
Add or subtract time periods from timestamps for scheduling and planning.
1import time
2from datetime import datetime, timedelta, timezone
3
4class DateArithmetic:
5 @staticmethod
6 def add_time(timestamp, **kwargs):
7 """Add time to timestamp using keyword arguments"""
8 dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
9 delta = timedelta(**kwargs)
10 new_dt = dt + delta
11 return int(new_dt.timestamp())
12
13 @staticmethod
14 def subtract_time(timestamp, **kwargs):
15 """Subtract time from timestamp"""
16 dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
17 delta = timedelta(**kwargs)
18 new_dt = dt - delta
19 return int(new_dt.timestamp())
20
21 @staticmethod
22 def start_of_day(timestamp):
23 """Get timestamp for start of day (00:00:00)"""
24 dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
25 start_of_day = dt.replace(hour=0, minute=0, second=0, microsecond=0)
26 return int(start_of_day.timestamp())
27
28 @staticmethod
29 def end_of_day(timestamp):
30 """Get timestamp for end of day (23:59:59)"""
31 dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
32 end_of_day = dt.replace(hour=23, minute=59, second=59, microsecond=0)
33 return int(end_of_day.timestamp())
34
35 @staticmethod
36 def next_monday(timestamp):
37 """Get timestamp for next Monday"""
38 dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
39 days_ahead = 0 - dt.weekday() # Monday is 0
40 if days_ahead <= 0: # Target day already happened this week
41 days_ahead += 7
42 return int((dt + timedelta(days_ahead)).replace(hour=0, minute=0, second=0).timestamp())
43
44 @staticmethod
45 def generate_schedule(start_timestamp, interval_hours, count):
46 """Generate a schedule of timestamps"""
47 schedule = []
48 current = start_timestamp
49
50 for i in range(count):
51 schedule.append({
52 'sequence': i + 1,
53 'timestamp': current,
54 'date': datetime.fromtimestamp(current, tz=timezone.utc).isoformat(),
55 'days_from_start': (current - start_timestamp) // 86400
56 })
57 current = DateArithmetic.add_time(current, hours=interval_hours)
58
59 return schedule
60
61# Usage examples
62base_timestamp = 1704067200 # January 1, 2024 00:00:00 UTC
63
64# Add time
65future = DateArithmetic.add_time(base_timestamp, days=7, hours=12)
66print(f"7 days and 12 hours later: {future}")
67
68# Start/end of day
69print(f"Start of day: {DateArithmetic.start_of_day(base_timestamp)}")
70print(f"End of day: {DateArithmetic.end_of_day(base_timestamp)}")
71
72# Generate weekly meeting schedule
73meetings = DateArithmetic.generate_schedule(base_timestamp, 168, 4) # Every week for 4 weeks
74for meeting in meetings:
75 print(f"Meeting {meeting['sequence']}: {meeting['date']}")
76
77# Reminder system
78def create_reminders(event_timestamp, reminder_intervals):
79 """Create reminder timestamps before an event"""
80 reminders = []
81 for interval in reminder_intervals:
82 reminder_time = DateArithmetic.subtract_time(event_timestamp, **interval)
83 reminders.append({
84 'timestamp': reminder_time,
85 'description': f"Reminder {interval}",
86 'event_timestamp': event_timestamp
87 })
88 return sorted(reminders, key=lambda x: x['timestamp'])
89
90# Example: Create reminders for an event
91event_time = DateArithmetic.add_time(base_timestamp, days=30) # Event in 30 days
92reminder_intervals = [
93 {'days': 7}, # 1 week before
94 {'days': 1}, # 1 day before
95 {'hours': 2}, # 2 hours before
96 {'minutes': 30} # 30 minutes before
97]
98
99reminders = create_reminders(event_time, reminder_intervals)
100for reminder in reminders:
101 dt = datetime.fromtimestamp(reminder['timestamp'], tz=timezone.utc)
102 print(f"Send reminder at: {dt.isoformat()}")
Real-World Scenarios
User Session Tracking
Track user login times, session duration, and activity patterns for analytics and security.
1class SessionManager {
2 constructor() {
3 this.sessions = new Map();
4 this.activityThreshold = 1800; // 30 minutes of inactivity
5 }
6
7 startSession(userId, metadata = {}) {
8 const sessionId = this.generateSessionId();
9 const timestamp = Math.floor(Date.now() / 1000);
10
11 const session = {
12 sessionId,
13 userId,
14 startTime: timestamp,
15 lastActivity: timestamp,
16 endTime: null,
17 duration: null,
18 active: true,
19 pageViews: 0,
20 actions: [],
21 metadata: {
22 userAgent: metadata.userAgent || '',
23 ipAddress: metadata.ipAddress || '',
24 location: metadata.location || '',
25 ...metadata
26 }
27 };
28
29 this.sessions.set(sessionId, session);
30 this.logActivity(sessionId, 'session_start', { timestamp });
31
32 return sessionId;
33 }
34
35 endSession(sessionId, reason = 'user_logout') {
36 const session = this.sessions.get(sessionId);
37 if (!session || !session.active) return null;
38
39 const timestamp = Math.floor(Date.now() / 1000);
40 session.endTime = timestamp;
41 session.duration = timestamp - session.startTime;
42 session.active = false;
43
44 this.logActivity(sessionId, 'session_end', {
45 timestamp,
46 reason,
47 duration: session.duration
48 });
49
50 return this.getSessionSummary(sessionId);
51 }
52
53 logActivity(sessionId, actionType, data = {}) {
54 const session = this.sessions.get(sessionId);
55 if (!session || !session.active) return false;
56
57 const timestamp = Math.floor(Date.now() / 1000);
58 session.lastActivity = timestamp;
59
60 const activity = {
61 timestamp,
62 actionType,
63 ...data
64 };
65
66 session.actions.push(activity);
67
68 if (actionType === 'page_view') {
69 session.pageViews++;
70 }
71
72 return true;
73 }
74
75 checkActiveSession(sessionId) {
76 const session = this.sessions.get(sessionId);
77 if (!session) return null;
78
79 const now = Math.floor(Date.now() / 1000);
80 const timeSinceActivity = now - session.lastActivity;
81
82 if (timeSinceActivity > this.activityThreshold && session.active) {
83 return this.endSession(sessionId, 'timeout');
84 }
85
86 return {
87 active: session.active,
88 timeSinceActivity,
89 sessionDuration: now - session.startTime
90 };
91 }
92
93 getSessionSummary(sessionId) {
94 const session = this.sessions.get(sessionId);
95 if (!session) return null;
96
97 const summary = {
98 sessionId: session.sessionId,
99 userId: session.userId,
100 startTime: session.startTime,
101 endTime: session.endTime,
102 duration: session.duration,
103 pageViews: session.pageViews,
104 totalActions: session.actions.length,
105 actionBreakdown: this.getActionBreakdown(session.actions),
106 peakActivity: this.findPeakActivity(session.actions),
107 metadata: session.metadata
108 };
109
110 return summary;
111 }
112
113 getActionBreakdown(actions) {
114 const breakdown = {};
115 actions.forEach(action => {
116 breakdown[action.actionType] = (breakdown[action.actionType] || 0) + 1;
117 });
118 return breakdown;
119 }
120
121 findPeakActivity(actions) {
122 if (actions.length === 0) return null;
123
124 // Group actions by hour
125 const hourlyActivity = {};
126 actions.forEach(action => {
127 const hour = new Date(action.timestamp * 1000).getUTCHours();
128 hourlyActivity[hour] = (hourlyActivity[hour] || 0) + 1;
129 });
130
131 // Find peak hour
132 const peakHour = Object.keys(hourlyActivity).reduce((a, b) =>
133 hourlyActivity[a] > hourlyActivity[b] ? a : b
134 );
135
136 return {
137 hour: parseInt(peakHour),
138 actionCount: hourlyActivity[peakHour]
139 };
140 }
141
142 generateSessionId() {
143 // Use crypto for secure random IDs
144 if (typeof crypto !== 'undefined' && crypto.randomUUID) {
145 return 'sess_' + crypto.randomUUID();
146 }
147 // Fallback for older browsers (less secure)
148 return 'sess_' + Math.random().toString(36).substr(2, 16) + '_' + Date.now();
149 }
150
151 // Cleanup old sessions (run periodically)
152 cleanup(olderThanDays = 30) {
153 const cutoff = Math.floor(Date.now() / 1000) - (olderThanDays * 86400);
154 let cleaned = 0;
155
156 for (const [sessionId, session] of this.sessions.entries()) {
157 if (session.startTime < cutoff) {
158 this.sessions.delete(sessionId);
159 cleaned++;
160 }
161 }
162
163 return cleaned;
164 }
165}
166
167// Usage example
168const sessionManager = new SessionManager();
169
170// Start a session
171const sessionId = sessionManager.startSession('user123', {
172 userAgent: 'Mozilla/5.0...',
173 ipAddress: '192.168.1.100',
174 location: 'New York, NY'
175});
176
177// Log activities
178sessionManager.logActivity(sessionId, 'page_view', { page: '/dashboard' });
179sessionManager.logActivity(sessionId, 'button_click', { button: 'export', section: 'reports' });
180sessionManager.logActivity(sessionId, 'form_submit', { form: 'user_profile' });
181
182// Check session status
183console.log(sessionManager.checkActiveSession(sessionId));
184
185// End session and get summary
186const summary = sessionManager.endSession(sessionId, 'user_logout');
187console.log('Session Summary:', summary);
Content Scheduling
Schedule content publication, manage embargos, and handle time-based content delivery.
1import time
2import json
3from datetime import datetime, timezone, timedelta
4from enum import Enum
5from typing import List, Dict, Optional
6
7class ContentStatus(Enum):
8 DRAFT = "draft"
9 SCHEDULED = "scheduled"
10 PUBLISHED = "published"
11 FAILED = "failed"
12 CANCELLED = "cancelled"
13
14class ContentScheduler:
15 def __init__(self):
16 self.content_items = {}
17 self.schedule_queue = []
18
19 def schedule_content(self, content_id: str, content_data: dict, publish_at: int,
20 recurring: Optional[dict] = None) -> dict:
21 """Schedule content for future publication"""
22
23 scheduled_item = {
24 'content_id': content_id,
25 'content_data': content_data,
26 'scheduled_at': int(time.time()),
27 'publish_at': publish_at,
28 'status': ContentStatus.SCHEDULED.value,
29 'attempts': 0,
30 'max_attempts': 3,
31 'recurring': recurring, # {'interval': 'weekly', 'count': 10}
32 'metadata': {
33 'created_by': content_data.get('author_id'),
34 'title': content_data.get('title', ''),
35 'type': content_data.get('type', 'post')
36 }
37 }
38
39 self.content_items[content_id] = scheduled_item
40 self.schedule_queue.append(content_id)
41 self.schedule_queue.sort(key=lambda x: self.content_items[x]['publish_at'])
42
43 return scheduled_item
44
45 def batch_schedule(self, content_list: List[dict], start_time: int,
46 interval_minutes: int = 60) -> List[dict]:
47 """Schedule multiple content items with intervals"""
48 scheduled_items = []
49 current_time = start_time
50
51 for i, content in enumerate(content_list):
52 content_id = f"batch_{int(time.time())}_{i}"
53 scheduled_item = self.schedule_content(
54 content_id,
55 content,
56 current_time
57 )
58 scheduled_items.append(scheduled_item)
59 current_time += (interval_minutes * 60)
60
61 return scheduled_items
62
63 def process_due_content(self) -> List[dict]:
64 """Process content that's due for publication"""
65 current_time = int(time.time())
66 processed = []
67
68 # Create a copy of the queue to avoid modification during iteration
69 items_to_process = [
70 cid for cid in self.schedule_queue
71 if self.content_items[cid]['publish_at'] <= current_time
72 and self.content_items[cid]['status'] == ContentStatus.SCHEDULED.value
73 ]
74
75 for content_id in items_to_process:
76 result = self._publish_content(content_id)
77 processed.append(result)
78
79 # Handle recurring content
80 if result['success'] and self.content_items[content_id].get('recurring'):
81 self._schedule_next_occurrence(content_id)
82
83 return processed
84
85 def _publish_content(self, content_id: str) -> dict:
86 """Simulate content publication"""
87 item = self.content_items[content_id]
88 item['attempts'] += 1
89
90 try:
91 # Simulate publication process
92 # In real implementation, this would call your CMS API, social media API, etc.
93 publication_result = self._simulate_publication(item['content_data'])
94
95 if publication_result['success']:
96 item['status'] = ContentStatus.PUBLISHED.value
97 item['published_at'] = int(time.time())
98 item['publication_id'] = publication_result.get('id')
99
100 # Remove from schedule queue
101 if content_id in self.schedule_queue:
102 self.schedule_queue.remove(content_id)
103
104 return {
105 'content_id': content_id,
106 'success': True,
107 'published_at': item['published_at'],
108 'message': 'Content published successfully'
109 }
110 else:
111 raise Exception(publication_result.get('error', 'Publication failed'))
112
113 except Exception as e:
114 if item['attempts'] >= item['max_attempts']:
115 item['status'] = ContentStatus.FAILED.value
116 item['failure_reason'] = str(e)
117 self.schedule_queue.remove(content_id)
118
119 return {
120 'content_id': content_id,
121 'success': False,
122 'error': str(e),
123 'attempts': item['attempts'],
124 'will_retry': item['attempts'] < item['max_attempts']
125 }
126
127 def _simulate_publication(self, content_data: dict) -> dict:
128 """Simulate content publication (replace with actual API calls)"""
129 # Simulate 90% success rate
130 import random
131 if random.random() < 0.9:
132 return {
133 'success': True,
134 'id': f"pub_{int(time.time())}_{random.randint(1000, 9999)}",
135 'url': f"https://example.com/posts/{content_data.get('title', '').lower().replace(' ', '-')}"
136 }
137 else:
138 return {
139 'success': False,
140 'error': 'API rate limit exceeded'
141 }
142
143 def _schedule_next_occurrence(self, content_id: str):
144 """Schedule next occurrence for recurring content"""
145 item = self.content_items[content_id]
146 recurring_config = item['recurring']
147
148 if not recurring_config:
149 return
150
151 # Calculate next publish time based on interval
152 interval_map = {
153 'daily': 86400,
154 'weekly': 604800,
155 'monthly': 2592000 # Approximate
156 }
157
158 interval_seconds = interval_map.get(recurring_config['interval'], 86400)
159 next_publish_time = item['publish_at'] + interval_seconds
160
161 # Check if we've reached the count limit
162 occurrence_count = recurring_config.get('current_count', 0) + 1
163 max_count = recurring_config.get('count', float('inf'))
164
165 if occurrence_count < max_count:
166 # Create new content item for next occurrence
167 next_content_id = f"{content_id}_recurring_{occurrence_count}"
168 next_item = item.copy()
169 next_item['content_id'] = next_content_id
170 next_item['publish_at'] = next_publish_time
171 next_item['status'] = ContentStatus.SCHEDULED.value
172 next_item['attempts'] = 0
173 next_item['recurring']['current_count'] = occurrence_count
174
175 self.content_items[next_content_id] = next_item
176 self.schedule_queue.append(next_content_id)
177 self.schedule_queue.sort(key=lambda x: self.content_items[x]['publish_at'])
178
179 def get_schedule_summary(self, days_ahead: int = 7) -> dict:
180 """Get summary of scheduled content for the next N days"""
181 current_time = int(time.time())
182 end_time = current_time + (days_ahead * 86400)
183
184 upcoming = []
185 for content_id in self.schedule_queue:
186 item = self.content_items[content_id]
187 if current_time <= item['publish_at'] <= end_time:
188 upcoming.append({
189 'content_id': content_id,
190 'title': item['metadata']['title'],
191 'type': item['metadata']['type'],
192 'publish_at': item['publish_at'],
193 'publish_date': datetime.fromtimestamp(
194 item['publish_at'], tz=timezone.utc
195 ).isoformat(),
196 'days_until': (item['publish_at'] - current_time) // 86400
197 })
198
199 return {
200 'total_upcoming': len(upcoming),
201 'upcoming_content': upcoming,
202 'next_24_hours': len([u for u in upcoming if u['days_until'] == 0])
203 }
204
205 def cancel_scheduled_content(self, content_id: str) -> bool:
206 """Cancel scheduled content"""
207 if content_id in self.content_items:
208 self.content_items[content_id]['status'] = ContentStatus.CANCELLED.value
209 if content_id in self.schedule_queue:
210 self.schedule_queue.remove(content_id)
211 return True
212 return False
213
214# Usage example
215scheduler = ContentScheduler()
216
217# Schedule individual content
218blog_post = {
219 'title': 'Introduction to Unix Timestamps',
220 'content': 'Unix timestamps are...',
221 'author_id': 'user123',
222 'type': 'blog_post',
223 'tags': ['programming', 'time', 'unix']
224}
225
226publish_time = int(time.time()) + 3600 # 1 hour from now
227scheduler.schedule_content('post_001', blog_post, publish_time)
228
229# Schedule recurring content
230newsletter_data = {
231 'title': 'Weekly Developer Newsletter',
232 'template': 'newsletter_template',
233 'author_id': 'newsletter_bot',
234 'type': 'newsletter'
235}
236
237# Every Monday at 9 AM for 12 weeks
238next_monday = int(time.time()) + (7 * 86400) # Simplified
239scheduler.schedule_content('newsletter_weekly', newsletter_data, next_monday, {
240 'interval': 'weekly',
241 'count': 12
242})
243
244# Batch schedule social media posts
245social_posts = [
246 {'title': 'Tip 1: Use UTC for timestamps', 'type': 'social'},
247 {'title': 'Tip 2: Consider timezone conversion', 'type': 'social'},
248 {'title': 'Tip 3: Validate timestamp ranges', 'type': 'social'}
249]
250
251start_time = int(time.time()) + 1800 # Start in 30 minutes
252scheduler.batch_schedule(social_posts, start_time, interval_minutes=30)
253
254# Process due content (would run in a background job)
255results = scheduler.process_due_content()
256print(f"Processed {len(results)} content items")
257
258# Get schedule overview
259summary = scheduler.get_schedule_summary(7)
260print(f"Next 7 days: {summary['total_upcoming']} items scheduled")
261print(f"Next 24 hours: {summary['next_24_hours']} items due")
Performance Monitoring
Measure response times, track performance metrics, and optimize application speed.
1class PerformanceMonitor {
2 constructor() {
3 this.metrics = new Map();
4 this.thresholds = {
5 responseTime: 2000, // 2 seconds
6 errorRate: 0.05, // 5%
7 throughput: 100 // requests per minute
8 };
9 this.alertCallbacks = [];
10 }
11
12 startRequest(requestId, metadata = {}) {
13 const timestamp = Math.floor(Date.now() / 1000);
14 this.metrics.set(requestId, {
15 startTime: timestamp,
16 startTimeMs: Date.now(),
17 endTime: null,
18 duration: null,
19 status: 'in_progress',
20 metadata: {
21 method: metadata.method || 'GET',
22 endpoint: metadata.endpoint || '/',
23 userId: metadata.userId || null,
24 userAgent: metadata.userAgent || '',
25 ...metadata
26 }
27 });
28
29 return requestId;
30 }
31
32 endRequest(requestId, statusCode = 200, responseSize = 0) {
33 const metric = this.metrics.get(requestId);
34 if (!metric) return null;
35
36 const endTime = Math.floor(Date.now() / 1000);
37 const endTimeMs = Date.now();
38 const duration = endTimeMs - metric.startTimeMs;
39
40 metric.endTime = endTime;
41 metric.duration = duration;
42 metric.status = statusCode >= 400 ? 'error' : 'success';
43 metric.statusCode = statusCode;
44 metric.responseSize = responseSize;
45
46 // Check for performance issues
47 this.checkThresholds(metric);
48
49 return metric;
50 }
51
52 getMetricsSummary(timeRangeMinutes = 60) {
53 const now = Math.floor(Date.now() / 1000);
54 const startTime = now - (timeRangeMinutes * 60);
55
56 const recentMetrics = Array.from(this.metrics.values())
57 .filter(m => m.startTime >= startTime && m.endTime !== null);
58
59 if (recentMetrics.length === 0) {
60 return { noData: true, timeRange: timeRangeMinutes };
61 }
62
63 const summary = {
64 timeRange: timeRangeMinutes,
65 totalRequests: recentMetrics.length,
66 successfulRequests: recentMetrics.filter(m => m.status === 'success').length,
67 errorRequests: recentMetrics.filter(m => m.status === 'error').length,
68 averageResponseTime: this.calculateAverage(recentMetrics.map(m => m.duration)),
69 medianResponseTime: this.calculateMedian(recentMetrics.map(m => m.duration)),
70 p95ResponseTime: this.calculatePercentile(recentMetrics.map(m => m.duration), 95),
71 throughput: recentMetrics.length / (timeRangeMinutes / 60), // requests per hour
72 errorRate: recentMetrics.filter(m => m.status === 'error').length / recentMetrics.length,
73 endpointBreakdown: this.getEndpointBreakdown(recentMetrics),
74 timeDistribution: this.getTimeDistribution(recentMetrics, 10) // 10-minute buckets
75 };
76
77 return summary;
78 }
79
80 getEndpointBreakdown(metrics) {
81 const breakdown = {};
82
83 metrics.forEach(metric => {
84 const endpoint = metric.metadata.endpoint;
85 if (!breakdown[endpoint]) {
86 breakdown[endpoint] = {
87 count: 0,
88 errors: 0,
89 totalDuration: 0,
90 avgDuration: 0
91 };
92 }
93
94 breakdown[endpoint].count++;
95 breakdown[endpoint].totalDuration += metric.duration;
96
97 if (metric.status === 'error') {
98 breakdown[endpoint].errors++;
99 }
100 });
101
102 // Calculate averages
103 Object.keys(breakdown).forEach(endpoint => {
104 const data = breakdown[endpoint];
105 data.avgDuration = Math.round(data.totalDuration / data.count);
106 data.errorRate = data.errors / data.count;
107 delete data.totalDuration; // Clean up
108 });
109
110 return breakdown;
111 }
112
113 getTimeDistribution(metrics, bucketMinutes) {
114 const buckets = {};
115 const bucketSize = bucketMinutes * 60; // Convert to seconds
116
117 metrics.forEach(metric => {
118 // Round timestamp to bucket
119 const bucketTime = Math.floor(metric.startTime / bucketSize) * bucketSize;
120
121 if (!buckets[bucketTime]) {
122 buckets[bucketTime] = {
123 timestamp: bucketTime,
124 requests: 0,
125 errors: 0,
126 totalDuration: 0
127 };
128 }
129
130 buckets[bucketTime].requests++;
131 buckets[bucketTime].totalDuration += metric.duration;
132
133 if (metric.status === 'error') {
134 buckets[bucketTime].errors++;
135 }
136 });
137
138 // Convert to array and calculate averages
139 return Object.values(buckets)
140 .map(bucket => ({
141 ...bucket,
142 avgDuration: Math.round(bucket.totalDuration / bucket.requests),
143 errorRate: bucket.errors / bucket.requests,
144 timestamp: bucket.timestamp,
145 time: new Date(bucket.timestamp * 1000).toISOString()
146 }))
147 .sort((a, b) => a.timestamp - b.timestamp);
148 }
149
150 checkThresholds(metric) {
151 const alerts = [];
152
153 // Check response time
154 if (metric.duration > this.thresholds.responseTime) {
155 alerts.push({
156 type: 'slow_response',
157 message: `Slow response time: ${metric.duration}ms (threshold: ${this.thresholds.responseTime}ms)`,
158 metric,
159 severity: 'warning'
160 });
161 }
162
163 // Check recent error rate
164 const recentMetrics = this.getRecentMetrics(5); // Last 5 minutes
165 const errorRate = recentMetrics.filter(m => m.status === 'error').length / recentMetrics.length;
166
167 if (errorRate > this.thresholds.errorRate) {
168 alerts.push({
169 type: 'high_error_rate',
170 message: `High error rate: ${(errorRate * 100).toFixed(2)}% (threshold: ${(this.thresholds.errorRate * 100)}%)`,
171 metric,
172 severity: 'critical'
173 });
174 }
175
176 // Trigger alerts
177 alerts.forEach(alert => this.triggerAlert(alert));
178 }
179
180 getRecentMetrics(minutes) {
181 const now = Math.floor(Date.now() / 1000);
182 const startTime = now - (minutes * 60);
183
184 return Array.from(this.metrics.values())
185 .filter(m => m.startTime >= startTime && m.endTime !== null);
186 }
187
188 triggerAlert(alert) {
189 console.warn(`PERFORMANCE ALERT: ${alert.message}`);
190
191 // Call registered alert callbacks
192 this.alertCallbacks.forEach(callback => {
193 try {
194 callback(alert);
195 } catch (error) {
196 console.error('Alert callback error:', error);
197 }
198 });
199 }
200
201 onAlert(callback) {
202 this.alertCallbacks.push(callback);
203 }
204
205 cleanup(olderThanHours = 24) {
206 const cutoff = Math.floor(Date.now() / 1000) - (olderThanHours * 3600);
207 let cleaned = 0;
208
209 for (const [requestId, metric] of this.metrics.entries()) {
210 if (metric.startTime < cutoff) {
211 this.metrics.delete(requestId);
212 cleaned++;
213 }
214 }
215
216 return cleaned;
217 }
218
219 // Utility methods
220 calculateAverage(numbers) {
221 return numbers.length ? Math.round(numbers.reduce((a, b) => a + b, 0) / numbers.length) : 0;
222 }
223
224 calculateMedian(numbers) {
225 const sorted = numbers.sort((a, b) => a - b);
226 const mid = Math.floor(sorted.length / 2);
227 return sorted.length % 2 ? sorted[mid] : Math.round((sorted[mid - 1] + sorted[mid]) / 2);
228 }
229
230 calculatePercentile(numbers, percentile) {
231 const sorted = numbers.sort((a, b) => a - b);
232 const index = Math.ceil((percentile / 100) * sorted.length) - 1;
233 return sorted[index] || 0;
234 }
235}
236
237// Usage example
238const monitor = new PerformanceMonitor();
239
240// Set up alert handling
241monitor.onAlert(alert => {
242 // Send to logging service, Slack, email, etc.
243 console.log(`Alert: ${alert.type} - ${alert.message}`);
244});
245
246// Simulate API requests
247function simulateApiRequest(endpoint, method = 'GET') {
248 const requestId = `req_${Date.now()}_${Math.random().toString(36).substr(2, 5)}`;
249
250 monitor.startRequest(requestId, {
251 endpoint,
252 method,
253 userId: 'user123',
254 userAgent: 'Test Agent'
255 });
256
257 // Simulate request processing time
258 setTimeout(() => {
259 const statusCode = Math.random() < 0.95 ? 200 : 500; // 5% error rate
260 const responseSize = Math.floor(Math.random() * 5000) + 100;
261
262 monitor.endRequest(requestId, statusCode, responseSize);
263 }, Math.random() * 3000); // Random response time up to 3 seconds
264}
265
266// Generate test traffic
267setInterval(() => {
268 simulateApiRequest('/api/users');
269 simulateApiRequest('/api/posts');
270 simulateApiRequest('/api/analytics');
271}, 1000);
272
273// Generate performance report every 5 minutes
274setInterval(() => {
275 const summary = monitor.getMetricsSummary(5);
276 console.log('Performance Summary (5 min):', {
277 requests: summary.totalRequests,
278 avgResponse: summary.averageResponseTime + 'ms',
279 errorRate: (summary.errorRate * 100).toFixed(2) + '%',
280 throughput: summary.throughput.toFixed(1) + ' req/hour'
281 });
282}, 300000);
283
284// Cleanup old metrics every hour
285setInterval(() => {
286 const cleaned = monitor.cleanup(24);
287 console.log(`Cleaned up ${cleaned} old metrics`);
288}, 3600000);
Advanced Patterns
Timezone Handling
Work with multiple timezones, handle daylight saving time, and ensure global compatibility.
1class TimezoneHandler {
2 constructor() {
3 // Common timezone mappings
4 this.timezones = {
5 'EST': 'America/New_York',
6 'PST': 'America/Los_Angeles',
7 'GMT': 'GMT',
8 'UTC': 'UTC',
9 'CET': 'Europe/Paris',
10 'JST': 'Asia/Tokyo'
11 };
12 }
13
14 // Convert Unix timestamp to local time for display
15 formatForTimezone(timestamp, timezone, format = 'full') {
16 try {
17 const date = new Date(timestamp * 1000);
18 const tz = this.timezones[timezone] || timezone;
19
20 const formatter = new Intl.DateTimeFormat('en-US', {
21 timeZone: tz,
22 year: 'numeric',
23 month: format === 'short' ? 'short' : 'long',
24 day: 'numeric',
25 hour: '2-digit',
26 minute: '2-digit',
27 second: format === 'full' ? '2-digit' : undefined,
28 timeZoneName: format === 'full' ? 'short' : undefined
29 });
30
31 return formatter.format(date);
32 } catch (error) {
33 return `Error: ${error.message}`;
34 }
35 }
36
37 // Get timezone offset for a specific timestamp
38 getTimezoneOffset(timestamp, timezone) {
39 const date = new Date(timestamp * 1000);
40 const tz = this.timezones[timezone] || timezone;
41
42 // Create formatter to get timezone offset
43 const utcDate = new Date(date.toLocaleString('en-US', { timeZone: 'UTC' }));
44 const localDate = new Date(date.toLocaleString('en-US', { timeZone: tz }));
45
46 const offsetMs = utcDate.getTime() - localDate.getTime();
47 const offsetHours = offsetMs / (1000 * 60 * 60);
48
49 return {
50 offsetMs,
51 offsetHours,
52 offsetString: `UTC${offsetHours >= 0 ? '+' : ''}${offsetHours}`
53 };
54 }
55
56 // Convert time from one timezone to another (returns Unix timestamp)
57 convertTimezone(timestamp, fromTz, toTz) {
58 // Unix timestamps are already timezone-independent
59 // This method is for display purposes or when working with local times
60 return {
61 timestamp, // Same timestamp
62 fromFormatted: this.formatForTimezone(timestamp, fromTz),
63 toFormatted: this.formatForTimezone(timestamp, toTz),
64 fromOffset: this.getTimezoneOffset(timestamp, fromTz),
65 toOffset: this.getTimezoneOffset(timestamp, toTz)
66 };
67 }
68
69 // Parse a local time string and convert to Unix timestamp
70 parseLocalTime(dateString, timezone) {
71 try {
72 // This is tricky - we need to parse assuming the timezone
73 const date = new Date(dateString);
74
75 if (isNaN(date.getTime())) {
76 throw new Error('Invalid date string');
77 }
78
79 // Get the offset for this timezone at this date
80 const tz = this.timezones[timezone] || timezone;
81 const utcTime = date.toLocaleString('en-US', { timeZone: 'UTC' });
82 const localTime = date.toLocaleString('en-US', { timeZone: tz });
83
84 // Calculate the adjustment needed
85 const utcDate = new Date(utcTime);
86 const localDate = new Date(localTime);
87 const offset = localDate.getTime() - utcDate.getTime();
88
89 return Math.floor((date.getTime() - offset) / 1000);
90 } catch (error) {
91 throw new Error(`Failed to parse local time: ${error.message}`);
92 }
93 }
94
95 // Get business hours for different timezones
96 getBusinessHours(timestamp, timezone, businessStart = 9, businessEnd = 17) {
97 const date = new Date(timestamp * 1000);
98 const tz = this.timezones[timezone] || timezone;
99
100 // Get the date in the target timezone
101 const localDate = new Date(date.toLocaleString('en-US', { timeZone: tz }));
102 const localHour = localDate.getHours();
103 const isWeekday = localDate.getDay() >= 1 && localDate.getDay() <= 5;
104 const isBusinessHours = localHour >= businessStart && localHour < businessEnd;
105
106 // Calculate next business hour
107 let nextBusinessHour = null;
108 if (!isBusinessHours || !isWeekday) {
109 const nextDay = new Date(localDate);
110
111 if (!isWeekday) {
112 // Move to next Monday
113 const daysToAdd = localDate.getDay() === 0 ? 1 : (8 - localDate.getDay());
114 nextDay.setDate(nextDay.getDate() + daysToAdd);
115 } else if (localHour >= businessEnd) {
116 // Move to next day
117 nextDay.setDate(nextDay.getDate() + 1);
118 }
119
120 nextDay.setHours(businessStart, 0, 0, 0);
121
122 // Convert back to UTC timestamp
123 const utcString = nextDay.toLocaleString('en-US', { timeZone: 'UTC' });
124 nextBusinessHour = Math.floor(new Date(utcString).getTime() / 1000);
125 }
126
127 return {
128 isBusinessHours: isBusinessHours && isWeekday,
129 isWeekday,
130 localHour,
131 nextBusinessHour,
132 businessHours: `${businessStart}:00 - ${businessEnd}:00`,
133 timezone: tz
134 };
135 }
136}
137
138// Usage examples
139const tzHandler = new TimezoneHandler();
140
141const timestamp = Math.floor(Date.now() / 1000);
142
143// Format for different timezones
144console.log('New York:', tzHandler.formatForTimezone(timestamp, 'EST'));
145console.log('Los Angeles:', tzHandler.formatForTimezone(timestamp, 'PST'));
146console.log('Tokyo:', tzHandler.formatForTimezone(timestamp, 'JST'));
147
148// Convert between timezones
149const conversion = tzHandler.convertTimezone(timestamp, 'EST', 'JST');
150console.log('Timezone conversion:', conversion);
151
152// Check business hours
153const businessHours = tzHandler.getBusinessHours(timestamp, 'EST');
154console.log('Business hours info:', businessHours);
155
156// Parse local time
157try {
158 const localTimestamp = tzHandler.parseLocalTime('2023-12-31 15:30:00', 'EST');
159 console.log('Parsed local time to timestamp:', localTimestamp);
160} catch (error) {
161 console.error('Parse error:', error.message);
162}
Batch Processing
Process large volumes of timestamp data efficiently and handle bulk operations.
1import csv
2import time
3import json
4from datetime import datetime, timezone
5from typing import List, Dict, Iterator, Optional
6import concurrent.futures
7from dataclasses import dataclass
8
9@dataclass
10class ProcessingResult:
11 success: bool
12 original_value: str
13 timestamp: Optional[int] = None
14 formatted_date: Optional[str] = None
15 error: Optional[str] = None
16
17class TimestampBatchProcessor:
18 def __init__(self, chunk_size: int = 1000, max_workers: int = 4):
19 self.chunk_size = chunk_size
20 self.max_workers = max_workers
21 self.supported_formats = [
22 '%Y-%m-%d %H:%M:%S',
23 '%Y-%m-%d',
24 '%m/%d/%Y %H:%M:%S',
25 '%m/%d/%Y',
26 '%d/%m/%Y %H:%M:%S',
27 '%d/%m/%Y',
28 '%Y-%m-%dT%H:%M:%SZ',
29 '%Y-%m-%dT%H:%M:%S',
30 '%a %b %d %H:%M:%S %Y', # Mon Dec 31 00:00:00 2023
31 ]
32
33 def detect_timestamp_format(self, value: str) -> Optional[str]:
34 """Detect the format of a timestamp string"""
35 value = value.strip()
36
37 # Check if it's already a Unix timestamp
38 try:
39 timestamp = float(value)
40 # Validate range (between 1970 and 2100)
41 if 0 <= timestamp <= 4102444800:
42 return 'unix'
43 except ValueError:
44 pass
45
46 # Try different date formats
47 for fmt in self.supported_formats:
48 try:
49 datetime.strptime(value, fmt)
50 return fmt
51 except ValueError:
52 continue
53
54 return None
55
56 def convert_to_timestamp(self, value: str, format_hint: Optional[str] = None) -> ProcessingResult:
57 """Convert a single value to Unix timestamp"""
58 try:
59 value = value.strip()
60
61 if not value:
62 return ProcessingResult(
63 success=False,
64 original_value=value,
65 error="Empty value"
66 )
67
68 # Use provided format hint or detect format
69 detected_format = format_hint or self.detect_timestamp_format(value)
70
71 if not detected_format:
72 return ProcessingResult(
73 success=False,
74 original_value=value,
75 error="Unknown timestamp format"
76 )
77
78 if detected_format == 'unix':
79 timestamp = int(float(value))
80 else:
81 dt = datetime.strptime(value, detected_format)
82 # Assume UTC if no timezone info
83 dt = dt.replace(tzinfo=timezone.utc)
84 timestamp = int(dt.timestamp())
85
86 # Format back to readable date for verification
87 formatted_date = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
88
89 return ProcessingResult(
90 success=True,
91 original_value=value,
92 timestamp=timestamp,
93 formatted_date=formatted_date
94 )
95
96 except Exception as e:
97 return ProcessingResult(
98 success=False,
99 original_value=value,
100 error=str(e)
101 )
102
103 def process_chunk(self, chunk: List[str], format_hint: Optional[str] = None) -> List[ProcessingResult]:
104 """Process a chunk of timestamp values"""
105 return [self.convert_to_timestamp(value, format_hint) for value in chunk]
106
107 def process_batch(self, values: List[str], format_hint: Optional[str] = None) -> Dict:
108 """Process a large batch of timestamp values with parallel processing"""
109 start_time = time.time()
110
111 # Split into chunks
112 chunks = [values[i:i + self.chunk_size] for i in range(0, len(values), self.chunk_size)]
113
114 all_results = []
115 successful_count = 0
116 error_count = 0
117
118 # Process chunks in parallel
119 with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
120 future_to_chunk = {
121 executor.submit(self.process_chunk, chunk, format_hint): i
122 for i, chunk in enumerate(chunks)
123 }
124
125 for future in concurrent.futures.as_completed(future_to_chunk):
126 chunk_index = future_to_chunk[future]
127 try:
128 chunk_results = future.result()
129 all_results.extend(chunk_results)
130
131 # Count successes and errors
132 for result in chunk_results:
133 if result.success:
134 successful_count += 1
135 else:
136 error_count += 1
137
138 except Exception as e:
139 print(f"Chunk {chunk_index} processing failed: {e}")
140 error_count += len(chunks[chunk_index])
141
142 processing_time = time.time() - start_time
143
144 return {
145 'total_processed': len(values),
146 'successful': successful_count,
147 'errors': error_count,
148 'success_rate': successful_count / len(values) if values else 0,
149 'processing_time': processing_time,
150 'throughput': len(values) / processing_time if processing_time > 0 else 0,
151 'results': all_results
152 }
153
154 def process_csv_file(self, file_path: str, timestamp_column: str,
155 output_file: Optional[str] = None) -> Dict:
156 """Process timestamps from a CSV file"""
157 start_time = time.time()
158
159 try:
160 with open(file_path, 'r', newline='', encoding='utf-8') as csvfile:
161 reader = csv.DictReader(csvfile)
162
163 if timestamp_column not in reader.fieldnames:
164 raise ValueError(f"Column '{timestamp_column}' not found in CSV")
165
166 # Read all timestamp values
167 timestamp_values = []
168 rows = []
169
170 for row in reader:
171 timestamp_values.append(row[timestamp_column])
172 rows.append(row)
173
174 # Process the timestamps
175 batch_result = self.process_batch(timestamp_values)
176
177 # Create output with original data plus converted timestamps
178 output_rows = []
179 for i, (row, result) in enumerate(zip(rows, batch_result['results'])):
180 output_row = row.copy()
181 output_row[f'{timestamp_column}_unix'] = result.timestamp if result.success else None
182 output_row[f'{timestamp_column}_iso'] = result.formatted_date if result.success else None
183 output_row[f'{timestamp_column}_error'] = result.error if not result.success else None
184 output_rows.append(output_row)
185
186 # Save to output file if specified
187 if output_file:
188 self.save_csv_results(output_rows, output_file)
189
190 total_time = time.time() - start_time
191
192 return {
193 **batch_result,
194 'input_file': file_path,
195 'output_file': output_file,
196 'total_time': total_time,
197 'rows_processed': len(rows),
198 'output_data': output_rows
199 }
200
201 except Exception as e:
202 return {
203 'success': False,
204 'error': str(e),
205 'input_file': file_path
206 }
207
208 def save_csv_results(self, data: List[Dict], output_file: str):
209 """Save processed results to CSV file"""
210 if not data:
211 return
212
213 with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
214 fieldnames = data[0].keys()
215 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
216 writer.writeheader()
217 writer.writerows(data)
218
219 def generate_report(self, results: Dict) -> str:
220 """Generate a processing report"""
221 if 'error' in results:
222 return f"Processing failed: {results['error']}"
223
224 report = f"""
225Timestamp Processing Report
226===========================
227Total records: {results['total_processed']:,}
228Successful conversions: {results['successful']:,}
229Failed conversions: {results['errors']:,}
230Success rate: {results['success_rate']:.2%}
231Processing time: {results['processing_time']:.2f} seconds
232Throughput: {results['throughput']:.0f} records/second
233
234"""
235
236 # Error analysis
237 if results['errors'] > 0:
238 error_types = {}
239 for result in results['results']:
240 if not result.success:
241 error_type = result.error or 'Unknown error'
242 error_types[error_type] = error_types.get(error_type, 0) + 1
243
244 report += "Error Breakdown:
245"
246 for error_type, count in sorted(error_types.items()):
247 report += f" {error_type}: {count} occurrences
248"
249
250 return report
251
252# Usage examples
253processor = TimestampBatchProcessor(chunk_size=500, max_workers=4)
254
255# Process a list of mixed timestamp formats
256mixed_timestamps = [
257 '2023-12-31 00:00:00',
258 '1703980800',
259 '12/31/2023',
260 '2023-12-31T00:00:00Z',
261 'Mon Dec 31 00:00:00 2023',
262 'invalid_date',
263 '31/12/2023',
264 '1703980800000' # milliseconds
265]
266
267# Process the batch
268results = processor.process_batch(mixed_timestamps)
269print(processor.generate_report(results))
270
271# Process CSV file example
272# Assume we have a CSV file with timestamp data
273sample_csv_data = [
274 {'id': 1, 'event': 'login', 'timestamp': '2023-12-31 09:00:00'},
275 {'id': 2, 'event': 'purchase', 'timestamp': '1703984400'},
276 {'id': 3, 'event': 'logout', 'timestamp': '12/31/2023 10:30:00'},
277]
278
279# Create sample CSV file
280with open('sample_events.csv', 'w', newline='') as csvfile:
281 fieldnames = ['id', 'event', 'timestamp']
282 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
283 writer.writeheader()
284 writer.writerows(sample_csv_data)
285
286# Process the CSV file
287csv_results = processor.process_csv_file(
288 'sample_events.csv',
289 'timestamp',
290 'converted_events.csv'
291)
292
293print(processor.generate_report(csv_results))
294
295# Example of custom format detection for specific use case
296def process_log_files(log_files: List[str], timestamp_pattern: str):
297 """Process multiple log files with custom timestamp patterns"""
298 all_results = {}
299
300 for log_file in log_files:
301 print(f"Processing {log_file}...")
302
303 # This would parse log files and extract timestamps
304 # For demonstration, we'll use sample data
305 sample_log_timestamps = [
306 '2023-12-31T09:00:00.123Z',
307 '2023-12-31T09:15:30.456Z',
308 '2023-12-31T09:30:45.789Z',
309 ]
310
311 results = processor.process_batch(sample_log_timestamps)
312 all_results[log_file] = results
313
314 print(f"Processed {results['total_processed']} timestamps")
315 print(f"Success rate: {results['success_rate']:.2%}")
316
317 return all_results
318
319# Process multiple log files
320log_results = process_log_files(['app.log', 'error.log', 'access.log'], '%Y-%m-%dT%H:%M:%S.%fZ')
321print("Log processing completed")