noot
This commit is contained in:
parent
542070a4cb
commit
e8be23774f
41
migrations/005_wynn_items_fullresult.sql
Normal file
41
migrations/005_wynn_items_fullresult.sql
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
-- Write your migrate up statements here
|
||||||
|
|
||||||
|
-- Create new table to store entire fullresult
|
||||||
|
create table wynn.items_fullresult (
|
||||||
|
hash text not null primary key,
|
||||||
|
data jsonb not null,
|
||||||
|
timestamp timestamptz not null default now()
|
||||||
|
);
|
||||||
|
|
||||||
|
create index items_fullresult_timestamp on wynn.items_fullresult (timestamp);
|
||||||
|
|
||||||
|
-- Drop the old items table
|
||||||
|
drop table if exists wynn.items;
|
||||||
|
|
||||||
|
-- Remove the old hash tracking
|
||||||
|
delete from meta.hashes where key = 'wynn.items';
|
||||||
|
|
||||||
|
---- create above / drop below ----
|
||||||
|
|
||||||
|
-- Recreate the old items table
|
||||||
|
create table wynn.items (
|
||||||
|
name text not null primary key,
|
||||||
|
display_name text not null,
|
||||||
|
type text not null,
|
||||||
|
hash text not null,
|
||||||
|
data jsonb not null
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Restore hash tracking
|
||||||
|
insert into meta.hashes (key, value)
|
||||||
|
select 'wynn.items', hash
|
||||||
|
from wynn.items_fullresult
|
||||||
|
order by timestamp desc
|
||||||
|
limit 1
|
||||||
|
on conflict do nothing;
|
||||||
|
|
||||||
|
-- Drop the new table
|
||||||
|
drop table if exists wynn.items_fullresult;
|
||||||
|
|
||||||
|
-- Write your migrate down statements here. If this migration is irreversible
|
||||||
|
-- Then delete the separator line above.
|
||||||
@ -17,8 +17,15 @@ export async function update_wynn_items() {
|
|||||||
if (parsed instanceof ArkErrors) {
|
if (parsed instanceof ArkErrors) {
|
||||||
throw parsed
|
throw parsed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validate we have a reasonable number of items
|
||||||
|
const itemCount = Object.keys(parsed).length
|
||||||
|
if (itemCount < 100) {
|
||||||
|
throw new Error(`Received suspiciously low number of items: ${itemCount}. Refusing to update to prevent data loss.`)
|
||||||
|
}
|
||||||
|
|
||||||
const { sql } = await c.getAsync(PG)
|
const { sql } = await c.getAsync(PG)
|
||||||
// iterate over all items with their names
|
// serialize the entire fullresult
|
||||||
const serializedData = stringify(parsed)
|
const serializedData = stringify(parsed)
|
||||||
if (!serializedData) {
|
if (!serializedData) {
|
||||||
throw new Error('Failed to serialize wynn items')
|
throw new Error('Failed to serialize wynn items')
|
||||||
@ -26,34 +33,20 @@ export async function update_wynn_items() {
|
|||||||
const dataHash = sha1Hash(serializedData)
|
const dataHash = sha1Hash(serializedData)
|
||||||
let found_new = false
|
let found_new = false
|
||||||
await sql.begin(async (sql) => {
|
await sql.begin(async (sql) => {
|
||||||
const [{ currenthash } = {}] = await sql`select value as currenthash from meta.hashes where key = 'wynn.items' limit 1`
|
// check if this hash already exists
|
||||||
if (currenthash === dataHash) {
|
const [existing] = await sql`select hash from wynn.items_fullresult where hash = ${dataHash} limit 1`
|
||||||
|
if (existing) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
found_new = true
|
found_new = true
|
||||||
log.info('updating wynn with new hash', { old: currenthash, new: dataHash })
|
log.info('updating wynn items with new hash', { hash: dataHash, itemCount })
|
||||||
for (const [displayName, item] of Object.entries(parsed)) {
|
// insert the entire fullresult as a single entry
|
||||||
const json = stringify(item)
|
await sql`insert into wynn.items_fullresult(hash, data, timestamp) values
|
||||||
if (!json) {
|
(${dataHash}, ${serializedData}::jsonb, ${new Date()})
|
||||||
throw new Error('Failed to serialize wynn item')
|
on conflict (hash) do nothing`
|
||||||
}
|
|
||||||
const itemHash = sha1Hash(json)
|
|
||||||
// insert the items
|
|
||||||
await sql`insert into wynn.items(name, display_name, type, data, hash) values
|
|
||||||
(${item.internalName}, ${displayName}, ${item.type}, ${json}, ${itemHash})
|
|
||||||
on conflict (name) do update set
|
|
||||||
display_name = EXCLUDED.display_name,
|
|
||||||
type = EXCLUDED.type,
|
|
||||||
data = EXCLUDED.data,
|
|
||||||
hash = EXCLUDED.hash`
|
|
||||||
// update the hash
|
|
||||||
await sql`insert into meta.hashes(key, value) values
|
|
||||||
('wynn.items', ${dataHash})
|
|
||||||
on conflict (key) do update set
|
|
||||||
value = EXCLUDED.value`
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
return {
|
return {
|
||||||
found_new,
|
found_new,
|
||||||
|
itemCount,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,12 +2,16 @@ import { proxyActivities } from '@temporalio/workflow'
|
|||||||
import type * as activities from '#/activities'
|
import type * as activities from '#/activities'
|
||||||
|
|
||||||
const { update_wynn_items } = proxyActivities<typeof activities>({
|
const { update_wynn_items } = proxyActivities<typeof activities>({
|
||||||
startToCloseTimeout: '1 minute',
|
startToCloseTimeout: '2 minutes',
|
||||||
retry: {
|
retry: {
|
||||||
maximumAttempts: 1,
|
maximumAttempts: 3,
|
||||||
|
initialInterval: '30s',
|
||||||
|
backoffCoefficient: 2,
|
||||||
|
maximumInterval: '5m',
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
export const workflowSyncItemDatabase = async () => {
|
export const workflowSyncItemDatabase = async () => {
|
||||||
const { found_new } = await update_wynn_items()
|
const { found_new, itemCount } = await update_wynn_items()
|
||||||
|
return { found_new, itemCount }
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user