Garrett Weaver
04/09/2025, 8:46 PMsort_merge
, would there be any known reason why Ray might "hang" and stop scheduling tasks?Zapier
04/10/2025, 7:33 PMstrftime
@universalmind303 (#4146)
- feat: adds case-sensitive and case-normalized catalog and table identifiers [1/3] @rchowell (#4149)
- feat(window): Add window partition execution @f4t4nt (#4097)
- feat: add is_t
and inner property methods to DataType
@universalmind303 (#4141)
- feat: add unix_timestamp
function @universalmind303 (#4130)
- feat: add io config for spark @universalmind303 (#4099)
- feat: Implement flight server in rust @colin-ho (#4004)
- feat(window): Add ExtractWindowFunction optimizer rule @f4t4nt (#4093)
- feat(window): Add window function definitions and skeleton API @f4t4nt (#4082)
- feat: add dt.day\_of\_year @universalmind303 (#4129)
๐ Bug Fixes
- fix: Re-enable mypy @colin-ho (#4160)
- fix: TimeUnit repr format @universalmind303 (#4164)
- fix: Fix list aggregates on empty series @desmondcheongzx (#4155)
- fix: sorting with nulls\_first @universalmind303 (#4154)
- fix: ignore hugging face in broken link workflow @rchowell (#4137)
๐ Performance
- perf: Optimize shuffle cache @colin-ho (#4101)
- perf(optimizer): anti/semi join pushdown rule @kevinzwang (#4132)
๐ Documentation
- docs: Fix tutorial notebooks @desmondcheongzx (#4161)
- docs: documents iceberg integration @rchowell (#4070)
๐ท CI
- ci: Native doc tests @colin-ho (#4142)
๐ง Maintenance
- chore: Update bug\_report.yml to use native runner @colin-ho (#4169)
- chore: simplify rust identifier to a string vector @rchowell (#4140)
Full Changelog: https://github.com/Eventual-Inc/Daft/compare/v0.4.9...v0.4.10
Release Notes: https://github.com/Eventual-Inc/Daft/releases/tag/v0.4.10Kesav Kolla
04/10/2025, 7:45 PMWes Madrigal
04/11/2025, 3:19 PMdaft
: https://github.com/wesmadrigal/GraphReduceCory Grinstead
04/11/2025, 4:27 PM@daft.udf(return_dtype=str)
def make_greeting(a,b, greeting: str ="hello"):
return [f"{greeting}, {a} {b}" for a, b in zip(a, b)]
df.select(make_greeting(
col('first_name'),
col('last_name'),
lit(1) # <-------------------- any way to make this error out and enforce that it's actually a string
)).collect()
Elvn
04/16/2025, 9:39 AMAndrew Fuqua
04/16/2025, 1:34 PMdf
with a single column of type Map<utf8, utf8>. When I try to write it to parquet, I get an error: ArrowInvalid: Map keys must be annotated as required.
print(df.schema())
โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโฎ
โ column_name โ type โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโชโโโโโโโโโโโโโโโโโโก
โ foo_map โ Map[Utf8: Utf8] โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโฏ
print(df.schema().to_pyarrow_schema())
foo_map: map<large_string, large_string>
child 0, entries: struct<key: large_string, value: large_string> not null
child 0, key: large_string
child 1, value: large_string
For more context, I constructed the dataframe by reading from an existing parquet file and selecting only a map type column. Writing non-map columns from the source parquet to a new parquet file works fine. I've narrowed down the issue to columns that have map types. Any advice for how to proceed here?ืขืืืช ืืืขื
04/16/2025, 2:26 PMray start --head --port=6379 --node-ip-address 44.222.217.130 --dashboard-host 0.0.0.0
i Event checked that all is good with ray status
======== Autoscaler status: 2025-04-16 14:23:06.641656 ========
Node status
---------------------------------------------------------------
Active:
1 node_8da791ef7c8b8034fcb2fdd215c5aee700d6c6b8a9b2b095848a60bb
Pending:
(no pending nodes)
Recent failures:
(no failures)
Resources
---------------------------------------------------------------
Usage:
0.0/48.0 CPU
0B/259.13GiB memory
0B/111.06GiB object_store_memory
Demands:
(no resource demands)
I opened all ports from my local laptop to the machine and have access to the ray dashboard.|
But when i try to run the example in the docs (https://www.getdaft.io/projects/docs/en/stable/distributed/)
import daft
daft.context.set_runner_ray(address="ray://<ray_address>:<port>")
df = daft.from_pydict({
"a": [3, 2, 5, 6, 1, 4],
"b": [True, False, False, True, True, False]
})
print(df)
But i keep getting the following error:
Traceback (most recent call last):
File "/Users/amit/projects/lakesphere/Glacierops/ray.py", line 2, in <module>
import ray
File "/Users/amit/projects/lakesphere/Glacierops/ray.py", line 9, in <module>
daft.context.set_runner_ray(address="<ray://44.222.217.130>")
File "/Users/amit/projects/lakesphere/Glacierops/venv/lib/python3.12/site-packages/daft/context.py", line 84, in set_runner_ray
py_ctx = _set_runner_ray(
^^^^^^^^^^^^^^^^
RuntimeError: Cannot set runner more than once
any help would be appreciated ๐Garrett Weaver
04/17/2025, 12:16 AMINFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
Yufan
04/17/2025, 9:08 AMfile writes with custom name templates
. frameworks like tensorflow post a requirement on the output file name (e.g., https://www.tensorflow.org/datasets/api_docs/python/tfds/core/ShardedFileTemplate) in ${SHARD_INDEX}-of-${NUM_SHARDS}
format. but it looks like daft currently only supports the https://github.com/Eventual-Inc/Daft/blob/260a408fd6917ce1b09aa83b835c132510be3271/daft/recordbatch/recordbatch_io.py#L535 pattern.
im not able to use the to_ray_dataset
converter as its requires full materialization atm, so just wanna think out loud and see if you have any clues ๐Yufan
04/17/2025, 10:10 PMYufan
04/21/2025, 5:13 PMdaft.read_parquet
, just wonder if theres any way to configure retries for this issue. would be great if I could get some guidances on where to patch it, Im able to build daft locally and happy to try any patches ๐
Error Type: TASK_EXECUTION_EXCEPTION
User exception:
ceptions.ByteStreamError: Io error: Cached error: Misc Transient error trying to read <s3://foo.parquet>
Details:
DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: hyper::Error(Connect, Ssl(Error { code: ErrorCode(5), cause: None }, X509VerifyResult { code: 0, error: "ok" })), connection: Unknown } })
ChanChan Mao
04/21/2025, 5:26 PMAndrew Fuqua
04/21/2025, 10:59 PMEverett Kleven
04/24/2025, 4:45 PMYufan
04/25/2025, 3:56 PMAndrew Kursar
04/25/2025, 9:29 PMyashovardhan chaturvedi
05/02/2025, 2:34 PMNeil Wadhvana
05/04/2025, 1:55 AMdaft
without needing to specify each column (since I can have anywhere from 1 to 5). Is there another syntax that would work? This is not working as is:
@daft.udf(return_dtype=daft.DataType.python())
def mean_ensemble(*depth_value_series: daft.Series) -> List[Dict[str, np.ndarray]]:
"""Apply mean ensemble to depth maps."""
depth_value_lists = [series.to_pylist() for series in depth_value_series]
reduced_depth_maps: List[Dict[str, np.ndarray]] = []
for depth_value_list in depth_value_lists:
# Calculate mean and standard deviation across all depth maps in the list
stacked_depths = np.stack(depth_value_list, axis=0)
mean_depth = np.mean(stacked_depths, axis=0)
std_depth = np.std(stacked_depths, axis=0)
reduced_depth_maps.append(
{
"mean": mean_depth,
"std": std_depth,
}
)
return reduced_depth_maps
Garrett Weaver
05/14/2025, 1:05 PMYufan
05/21/2025, 11:29 PMYuri Gorokhov
05/23/2025, 9:29 PMAttempting to downcast Map { key: Utf8, value: List(Utf8) } to \"daft_core::array::list_array::ListArray\"
Wondering if someone has seen this before?Giridhar Pathak
05/25/2025, 1:37 AMTypeError: pyarrow.lib.large_list() takes no keyword arguments
the code:
table = catalog.load_table(table)
return df.read_iceberg(table)
has anyone experienced this before?Giridhar Pathak
05/25/2025, 10:18 PMdaft.read_table("platform.messages").filter("event_time > TIMESTAMP '2025-05-24T00:00:00Z'").limit(5).show()
running this makes the process crash. looks like memory goes thru the roof. Not sure if its trying to read the whole table into memory.
pre-materialization, i can get the schema just fine.Everett Kleven
05/28/2025, 2:15 PMYuri Gorokhov
05/28/2025, 4:14 PM.dropDuplicates(subset: Optional[List[str]] = None)
where you can specify which columns to consider?Pat Patterson
05/29/2025, 11:37 PM# How many records are in the current Drive Stats dataset?
count, elapsed_time = time_collect(drivestats.count())
print(f'Total record count: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)')
With the other systems I tested in my blog post, the equivalent query takes between a fraction of a second and 15 seconds. That Daft call to drivestats.count()
takes 80 seconds. Iโm guessing itโs doing way more work than it needs to - reading the record counts from each of the 365 Parquet files rather than simply reading total-records
from the most recent metadata file. Since SELECT COUNT(*)
is such a common operation, I think itโs worth short-circuiting the current behavior.Giridhar Pathak
06/03/2025, 2:17 PMEverett Kleven
06/03/2025, 2:59 PMPat Patterson
06/06/2025, 10:47 PMimport daft
import ray
ray.init("<ray://head_node_host:10001>", runtime_env={"pip": ["daft"]})
daft.context.set_runner_ray("<ray://head_node_host:10001>")
catalog = load_catalog(
'iceberg',
**{
'uri': 'sqlite:///:memory:',
# configuration to access Backblaze B2's S3-compatible API such as
# s3.endpoint, s3.region, etc
}
}
catalog.create_namespace('default', { 'location': f'<s3://my-bucket/'}>)
table = catalog.register_table('default.drivestats', metadata_location)
drivestats = daft.read_iceberg(table)
result = drivestats.count().collect()
print(f'Total record count: {result.to_pydict()['count'][0]}')
Presumably, the code to read Parquet files from Backblaze B2 via the AWS SDK executes on the Ray cluster, so I have to either install the necessary packages there ahead of time or specify them, and environment variables, in runtime_env
? For example:
ray.init("ray://<head_node_host>:10001", runtime_env={
"pip": ["daft==0.5.2", "boto3==1.34.162", "botocore==1.34.162", ...etc...],
"env_vars": {
"AWS_ENDPOINT_URL": os.environ["AWS_ENDPOINT_URL"],
"AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"],
"AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"],
...etc...
}
})